This is an automated email from the ASF dual-hosted git repository. yiyang0203 pushed a commit to branch HDDS-5713 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 3529de4e28401745791a4f6b438a163320dfce91 Author: Symious <[email protected]> AuthorDate: Tue Aug 16 10:40:22 2022 +0800 HDDS-7106. [DiskBalancer] Client-SCM interface (#3663) --- .../apache/hadoop/hdds/scm/client/ScmClient.java | 41 +++++ .../protocol/StorageContainerLocationProtocol.java | 30 ++++ .../scm/storage/DiskBalancerConfiguration.java | 165 +++++++++++++++++++ .../org/apache/hadoop/hdds/conf/ConfigTag.java | 3 +- ...inerLocationProtocolClientSideTranslatorPB.java | 55 +++++++ .../src/main/proto/ScmAdminProtocol.proto | 21 +++ .../interface-client/src/main/proto/hdds.proto | 13 ++ .../hadoop/hdds/scm/node/DiskBalancerManager.java | 176 +++++++++++++++++++++ .../hadoop/hdds/scm/node/DiskBalancerStatus.java | 50 ++++++ .../org/apache/hadoop/hdds/scm/node/NodeUtils.java | 150 ++++++++++++++++++ ...inerLocationProtocolServerSideTranslatorPB.java | 33 +++- .../hdds/scm/server/SCMClientProtocolServer.java | 50 ++++++ .../hdds/scm/server/StorageContainerManager.java | 8 + .../hdds/scm/node/TestDiskBalancerManager.java | 99 ++++++++++++ .../hdds/scm/cli/ContainerOperationClient.java | 34 ++++ .../hadoop/ozone/scm/node/TestDiskBalancer.java | 90 +++++++++++ 16 files changed, 1016 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index fb5a2deee2..478d08dff0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -454,4 +454,45 @@ public interface ScmClient extends Closeable { String scmId) throws IOException; String getMetrics(String query) throws IOException; + + /** + * Get DiskBalancer status. + * @param count top datanodes that need balancing + * @return List of DatanodeDiskBalancerInfo. + * @throws IOException + */ + List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport( + int count) throws IOException; + + /** + * Get DiskBalancer status. + * @param hosts If hosts is not null, return status of hosts; If hosts is + * null, return status of all datanodes in balancing. + * @return List of DatanodeDiskBalancerInfo. + * @throws IOException + */ + List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus( + Optional<List<String>> hosts) throws IOException; + + /** + * Start DiskBalancer. + */ + void startDiskBalancer( + Optional<Double> threshold, + Optional<Double> bandwidth, + Optional<List<String>> hosts) throws IOException; + + /** + * Stop DiskBalancer. + */ + void stopDiskBalancer(Optional<List<String>> hosts) throws IOException; + + + /** + * Update DiskBalancer Configuration. + */ + void updateDiskBalancerConfiguration( + Optional<Double> threshold, + Optional<Double> bandwidth, + Optional<List<String>> hosts) throws IOException; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index 663f317a3b..2a7969049d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -476,4 +476,34 @@ public interface StorageContainerLocationProtocol extends Closeable { String scmId) throws IOException; String getMetrics(String query) throws IOException; + + List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport( + int count, int clientVersion) throws IOException; + + /** + * Get DiskBalancer status. + */ + List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus( + Optional<List<String>> hosts, int clientVersion) throws IOException; + + /** + * Start DiskBalancer. + */ + void startDiskBalancer( + Optional<Double> threshold, + Optional<Double> bandwidth, + Optional<List<String>> hosts) throws IOException; + + /** + * Stop DiskBalancer. + */ + void stopDiskBalancer(Optional<List<String>> hosts) throws IOException; + + /** + * Update DiskBalancer Configuration. + */ + void updateDiskBalancerConfiguration( + Optional<Double> threshold, + Optional<Double> bandwidth, + Optional<List<String>> hosts) throws IOException; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java new file mode 100644 index 0000000000..54709e4c17 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import jakarta.annotation.Nonnull; +import org.apache.hadoop.hdds.conf.Config; +import org.apache.hadoop.hdds.conf.ConfigGroup; +import org.apache.hadoop.hdds.conf.ConfigTag; +import org.apache.hadoop.hdds.conf.ConfigType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class contains configuration values for the DiskBalancer. + */ +@ConfigGroup(prefix = "hdds.datanode.disk.balancer") +public final class DiskBalancerConfiguration { + private static final Logger LOG = + LoggerFactory.getLogger(DiskBalancerConfiguration.class); + + @Config(key = "volume.density.threshold", type = ConfigType.AUTO, + defaultValue = "10", tags = {ConfigTag.DISKBALANCER}, + description = "Threshold is a percentage in the range of 0 to 100. A " + + "datanode is considered balanced if for each volume, the " + + "utilization of the volume(used space to capacity ratio) differs" + + " from the utilization of the datanode(used space to capacity ratio" + + " of the entire datanode) no more than the threshold.") + private double threshold = 10d; + + @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.AUTO, + defaultValue = "10", tags = {ConfigTag.DISKBALANCER}, + description = "The max balance speed.") + private double diskBandwidth = 10; + + @Config(key = "parallel.thread", type = ConfigType.AUTO, + defaultValue = "5", tags = {ConfigTag.DISKBALANCER}, + description = "The max parallel balance thread count.") + private int parallelThread = 5; + + /** + * Gets the threshold value for DiskBalancer. + * + * @return percentage value in the range 0 to 100 + */ + public double getThreshold() { + return threshold; + } + + public double getThresholdAsRatio() { + return threshold / 100; + } + + /** + * Sets the threshold value for Disk Balancer. + * + * @param threshold a percentage value in the range 0 to 100 + */ + public void setThreshold(double threshold) { + if (threshold < 0d || threshold >= 100d) { + throw new IllegalArgumentException( + "Threshold must be a percentage(double) in the range 0 to 100."); + } + this.threshold = threshold; + } + + /** + * Gets the disk bandwidth value for Disk Balancer. + * + * @return max disk bandwidth per second + */ + public double getDiskBandwidth() { + return diskBandwidth; + } + + /** + * Sets the disk bandwidth value for Disk Balancer. + * + * @param diskBandwidth the bandwidth to control balance speed + */ + public void setDiskBandwidth(double diskBandwidth) { + if (diskBandwidth <= 0d) { + throw new IllegalArgumentException( + "diskBandwidth must be a value larger than 0."); + } + this.diskBandwidth = diskBandwidth; + } + + /** + * Gets the parallel thread for Disk Balancer. + * + * @return parallel thread + */ + public int getParallelThread() { + return parallelThread; + } + + /** + * Sets the parallel thread for Disk Balancer. + * + * @param parallelThread the parallel thread count + */ + public void setParallelThread(int parallelThread) { + if (parallelThread <= 0) { + throw new IllegalArgumentException( + "parallelThread must be a value larger than 0."); + } + this.parallelThread = parallelThread; + } + @Override + public String toString() { + return String.format("Disk Balancer Configuration values:%n" + + "%-50s %s%n" + + "%-50s %s%n" + + "%-50s %s%n" + + "%-50s %s%n", + "Key", "Value", + "Threshold", threshold, "Max disk bandwidth", diskBandwidth, + "Parallel Thread", parallelThread); + } + + public HddsProtos.DiskBalancerConfigurationProto.Builder toProtobufBuilder() { + HddsProtos.DiskBalancerConfigurationProto.Builder builder = + HddsProtos.DiskBalancerConfigurationProto.newBuilder(); + + builder.setThreshold(threshold) + .setDiskBandwidth(diskBandwidth) + .setParallelThread(parallelThread); + return builder; + } + + static DiskBalancerConfiguration fromProtobuf( + @Nonnull HddsProtos.DiskBalancerConfigurationProto proto, + @Nonnull OzoneConfiguration ozoneConfiguration) { + DiskBalancerConfiguration config = + ozoneConfiguration.getObject(DiskBalancerConfiguration.class); + if (proto.hasThreshold()) { + config.setThreshold(proto.getThreshold()); + } + if (proto.hasDiskBandwidth()) { + config.setDiskBandwidth(proto.getDiskBandwidth()); + } + if (proto.hasParallelThread()) { + config.setParallelThread(proto.getParallelThread()); + } + return config; + } +} diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java index 24feb69389..d4817ea592 100644 --- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java +++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java @@ -52,5 +52,6 @@ public enum ConfigTag { TLS, TOKEN, UPGRADE, - X509 + X509, + DISKBALANCER } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 109358c67b..c1a48cf18f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -47,6 +47,9 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeAdminErrorResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoType; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesResponseProto; @@ -1114,6 +1117,58 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB return response.getContainerCount(); } + public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport( + int count, int clientVersion) throws IOException { + DatanodeDiskBalancerInfoRequestProto request = + DatanodeDiskBalancerInfoRequestProto.newBuilder() + .setInfoType(DatanodeDiskBalancerInfoType.report) + .setCount(count) + .build(); + + DatanodeDiskBalancerInfoResponseProto response = + submitRequest(Type.DatanodeDiskBalancerInfo, + builder -> builder.setDatanodeDiskBalancerInfoRequest(request)) + .getDatanodeDiskBalancerInfoResponse(); + + return response.getInfoList(); + } + + @Override + public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus( + Optional<List<String>> hosts, int clientVersion) throws IOException { + DatanodeDiskBalancerInfoRequestProto.Builder requestBuilder = + DatanodeDiskBalancerInfoRequestProto.newBuilder() + .setInfoType(DatanodeDiskBalancerInfoType.status); + hosts.ifPresent(requestBuilder::addAllHosts); + DatanodeDiskBalancerInfoRequestProto request = requestBuilder.build(); + + DatanodeDiskBalancerInfoResponseProto response = + submitRequest(Type.DatanodeDiskBalancerInfo, + builder -> builder.setDatanodeDiskBalancerInfoRequest(request)) + .getDatanodeDiskBalancerInfoResponse(); + + return response.getInfoList(); + } + + @Override + public void startDiskBalancer(Optional<Double> threshold, + Optional<Double> bandwidth, Optional<List<String>> hosts) + throws IOException { + + } + + @Override + public void stopDiskBalancer(Optional<List<String>> hosts) + throws IOException { + + } + + @Override + public void updateDiskBalancerConfiguration(Optional<Double> threshold, + Optional<Double> bandwidth, Optional<List<String>> hosts) + throws IOException { + } + @Override public Object getUnderlyingProxyObject() { return rpcProxy; diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto index e8b8d62394..4730a4ccc7 100644 --- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto +++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto @@ -84,6 +84,7 @@ message ScmContainerLocationRequest { optional SingleNodeQueryRequestProto singleNodeQueryRequest = 45; optional GetContainersOnDecomNodeRequestProto getContainersOnDecomNodeRequest = 46; optional GetMetricsRequestProto getMetricsRequest = 47; + optional DatanodeDiskBalancerInfoRequestProto DatanodeDiskBalancerInfoRequest = 48; } message ScmContainerLocationResponse { @@ -139,6 +140,7 @@ message ScmContainerLocationResponse { optional SingleNodeQueryResponseProto singleNodeQueryResponse = 45; optional GetContainersOnDecomNodeResponseProto getContainersOnDecomNodeResponse = 46; optional GetMetricsResponseProto getMetricsResponse = 47; + optional DatanodeDiskBalancerInfoResponseProto DatanodeDiskBalancerInfoResponse = 48; enum Status { OK = 1; @@ -193,6 +195,7 @@ enum Type { SingleNodeQuery = 41; GetContainersOnDecomNode = 42; GetMetrics = 43; + DatanodeDiskBalancerInfo = 44; } /** @@ -357,6 +360,24 @@ message DatanodeUsageInfoResponseProto { repeated DatanodeUsageInfoProto info = 1; } +enum DatanodeDiskBalancerInfoType{ + report = 1; + status = 2; +} + +/* + Datanode disk balancer status request message. +*/ +message DatanodeDiskBalancerInfoRequestProto { + required DatanodeDiskBalancerInfoType infoType = 1; + optional uint32 count = 2; + repeated string hosts = 3; +} + +message DatanodeDiskBalancerInfoResponseProto { + repeated DatanodeDiskBalancerInfoProto info = 1; +} + /* Decommission a list of hosts */ diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index 4058453123..803842ed2d 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -521,3 +521,16 @@ message InnerNode { optional uint32 numOfLeaves = 2; repeated ChildrenMap childrenMap = 3; } + +message DiskBalancerConfigurationProto { + optional double threshold = 1; + optional double diskBandwidth = 2; + optional int32 parallelThread = 3; +} + +message DatanodeDiskBalancerInfoProto { + required DatanodeDetailsProto node = 1; + required double currentVolumeDensitySum = 2; + optional bool diskBalancerRunning = 3; + optional DiskBalancerConfigurationProto diskBalancerConf = 4; +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java new file mode 100644 index 0000000000..0b16c1aa95 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; + +/** + * Maintains information about the DiskBalancer on SCM side. + */ +public class DiskBalancerManager { + public static final Logger LOG = + LoggerFactory.getLogger(DiskBalancerManager.class); + + private final EventPublisher scmNodeEventPublisher; + private final SCMContext scmContext; + private final NodeManager nodeManager; + private Map<DatanodeDetails, DiskBalancerStatus> statusMap; + private boolean useHostnames; + + /** + * Constructs DiskBalancer Manager. + */ + public DiskBalancerManager(OzoneConfiguration conf, + EventPublisher eventPublisher, + SCMContext scmContext, + NodeManager nodeManager) { + this.scmNodeEventPublisher = eventPublisher; + this.scmContext = scmContext; + this.nodeManager = nodeManager; + this.useHostnames = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, + DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); + this.statusMap = new ConcurrentHashMap<>(); + } + + public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport( + int count, int clientVersion) throws IOException { + + List<HddsProtos.DatanodeDiskBalancerInfoProto> reportList = + new ArrayList<>(); + + for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE, + HddsProtos.NodeState.HEALTHY)) { + double volumeDensitySum = + getVolumeDataDensitySumForDatanodeDetails(datanodeDetails); + reportList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder() + .setCurrentVolumeDensitySum(volumeDensitySum) + .setNode(datanodeDetails.toProto(clientVersion)) + .build()); + } + + reportList.sort((t1, t2) -> Double.compare(t2.getCurrentVolumeDensitySum(), + t1.getCurrentVolumeDensitySum())); + return reportList.stream().limit(count).collect(Collectors.toList()); + } + + /** + * If hosts is not null, return status of hosts; + * If hosts is null, return status of all datanodes in balancing. + */ + public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus( + Optional<List<String>> hosts, int clientVersion) throws IOException { + List<HddsProtos.DatanodeDiskBalancerInfoProto> statusList = + new ArrayList<>(); + List<DatanodeDetails> filterDns = null; + if (hosts.isPresent() && !hosts.get().isEmpty()) { + filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(), + useHostnames); + } + + for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE, + HddsProtos.NodeState.HEALTHY)) { + if (shouldReturnDatanode(filterDns, datanodeDetails)) { + double volumeDensitySum = + getVolumeDataDensitySumForDatanodeDetails(datanodeDetails); + statusList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder() + .setCurrentVolumeDensitySum(volumeDensitySum) + .setDiskBalancerRunning(isRunning(datanodeDetails)) + .setDiskBalancerConf(statusMap.getOrDefault(datanodeDetails, + DiskBalancerStatus.DUMMY_STATUS) + .getDiskBalancerConfiguration().toProtobufBuilder()) + .setNode(datanodeDetails.toProto(clientVersion)) + .build()); + } + } + return statusList; + } + + private boolean shouldReturnDatanode(List<DatanodeDetails> hosts, + DatanodeDetails datanodeDetails) { + if (hosts == null || hosts.isEmpty()) { + return isRunning(datanodeDetails); + } else { + return hosts.contains(datanodeDetails); + } + } + + /** + * Get volume density for a specific DatanodeDetails node. + * + * @param datanodeDetails DatanodeDetails + * @return DiskBalancer report. + */ + private double getVolumeDataDensitySumForDatanodeDetails( + DatanodeDetails datanodeDetails) { + Preconditions.checkArgument(datanodeDetails instanceof DatanodeInfo); + + DatanodeInfo datanodeInfo = (DatanodeInfo) datanodeDetails; + + double totalCapacity = 0d, totalUsed = 0d; + for (StorageContainerDatanodeProtocolProtos.StorageReportProto reportProto : + datanodeInfo.getStorageReports()) { + totalCapacity += reportProto.getCapacity(); + totalUsed += reportProto.getScmUsed(); + } + + Preconditions.checkArgument(totalCapacity != 0); + double idealUsage = totalUsed / totalCapacity; + + double volumeDensitySum = datanodeInfo.getStorageReports().stream() + .map(report -> + Math.abs((double)report.getScmUsed() / report.getCapacity() + - idealUsage)) + .mapToDouble(Double::valueOf).sum(); + + return volumeDensitySum; + } + + private boolean isRunning(DatanodeDetails datanodeDetails) { + return statusMap + .getOrDefault(datanodeDetails, DiskBalancerStatus.DUMMY_STATUS) + .isRunning(); + } + + @VisibleForTesting + public void addRunningDatanode(DatanodeDetails datanodeDetails) { + statusMap.put(datanodeDetails, new DiskBalancerStatus(true, + new DiskBalancerConfiguration())); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java new file mode 100644 index 0000000000..a064a6ff9f --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Maintains DiskBalancerConfiguration and isRunning. + */ +public class DiskBalancerStatus { + public static final Logger LOG = + LoggerFactory.getLogger(DiskBalancerStatus.class); + + private boolean isRunning; + private DiskBalancerConfiguration diskBalancerConfiguration; + + public static final DiskBalancerStatus DUMMY_STATUS = + new DiskBalancerStatus(false, new DiskBalancerConfiguration()); + + public DiskBalancerStatus(boolean isRunning, DiskBalancerConfiguration conf) { + this.isRunning = isRunning; + this.diskBalancerConfiguration = conf; + } + + public boolean isRunning() { + return isRunning; + } + + public DiskBalancerConfiguration getDiskBalancerConfiguration() { + return diskBalancerConfiguration; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java new file mode 100644 index 0000000000..6df74d3746 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.LinkedList; +import java.util.List; + +/** + * Util class for Node operations. + */ +public final class NodeUtils { + + private static final Logger LOG = LoggerFactory.getLogger(NodeUtils.class); + + private NodeUtils() { + } + + public static List<DatanodeDetails> mapHostnamesToDatanodes( + NodeManager nodeManager, List<String> hosts, boolean useHostnames) + throws InvalidHostStringException { + List<DatanodeDetails> results = new LinkedList<>(); + for (String hostString : hosts) { + HostDefinition host = new HostDefinition(hostString); + InetAddress addr; + try { + addr = InetAddress.getByName(host.getHostname()); + } catch (UnknownHostException e) { + throw new InvalidHostStringException("Unable to resolve host " + + host.getRawHostname(), e); + } + String dnsName; + if (useHostnames) { + dnsName = addr.getHostName(); + } else { + dnsName = addr.getHostAddress(); + } + List<DatanodeDetails> found = nodeManager.getNodesByAddress(dnsName); + if (found.size() == 0) { + throw new InvalidHostStringException("Host " + host.getRawHostname() + + " (" + dnsName + ") is not running any datanodes registered" + + " with SCM." + + " Please check the host name."); + } else if (found.size() == 1) { + if (host.getPort() != -1 && + !validateDNPortMatch(host.getPort(), found.get(0))) { + throw new InvalidHostStringException("Host " + host.getRawHostname() + + " is running a datanode registered with SCM," + + " but the port number doesn't match." + + " Please check the port number."); + } + results.add(found.get(0)); + } else if (found.size() > 1) { + DatanodeDetails match = null; + for (DatanodeDetails dn : found) { + if (validateDNPortMatch(host.getPort(), dn)) { + match = dn; + break; + } + } + if (match == null) { + throw new InvalidHostStringException("Host " + host.getRawHostname() + + " is running multiple datanodes registered with SCM," + + " but no port numbers match." + + " Please check the port number."); + } + results.add(match); + } + } + return results; + } + + /** + * Check if the passed port is used by the given DatanodeDetails object. If + * it is, return true, otherwise return false. + * @param port Port number to check if it is used by the datanode + * @param dn Datanode to check if it is using the given port + * @return True if port is used by the datanode. False otherwise. + */ + private static boolean validateDNPortMatch(int port, DatanodeDetails dn) { + for (DatanodeDetails.Port p : dn.getPorts()) { + if (p.getValue() == port) { + return true; + } + } + return false; + } + static class HostDefinition { + private String rawHostname; + private String hostname; + private int port; + + HostDefinition(String hostname) throws InvalidHostStringException { + this.rawHostname = hostname; + parseHostname(); + } + + public String getRawHostname() { + return rawHostname; + } + + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + private void parseHostname() throws InvalidHostStringException { + try { + // A URI *must* have a scheme, so just create a fake one + URI uri = new URI("empty://" + rawHostname.trim()); + this.hostname = uri.getHost(); + this.port = uri.getPort(); + + if (this.hostname == null) { + throw new InvalidHostStringException("The string " + rawHostname + + " does not contain a value hostname or hostname:port definition"); + } + } catch (URISyntaxException e) { + throw new InvalidHostStringException( + "Unable to parse the hoststring " + rawHostname, e); + } + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index a44536bf44..71a8edad41 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeAdminErrorResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto; @@ -722,6 +723,14 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB .setStatus(Status.OK) .setGetMetricsResponse(getMetrics(request.getGetMetricsRequest())) .build(); + case DatanodeDiskBalancerInfo: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setDatanodeDiskBalancerInfoResponse(getDatanodeDiskBalancerInfo( + request.getDatanodeDiskBalancerInfoRequest(), + request.getVersion())) + .build(); default: throw new IllegalArgumentException( "Unknown command type: " + request.getCmdType()); @@ -1293,7 +1302,29 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB public DecommissionScmResponseProto decommissionScm( DecommissionScmRequestProto request) throws IOException { return impl.decommissionScm( - request.getScmId()); + request.getScmId()); + } + + public DatanodeDiskBalancerInfoResponseProto getDatanodeDiskBalancerInfo( + StorageContainerLocationProtocolProtos. + DatanodeDiskBalancerInfoRequestProto request, int clientVersion) + throws IOException { + List<HddsProtos.DatanodeDiskBalancerInfoProto> infoProtoList; + switch (request.getInfoType()) { + case report: + infoProtoList = impl.getDiskBalancerReport(request.getCount(), + clientVersion); + break; + case status: + infoProtoList = impl.getDiskBalancerStatus( + Optional.of(request.getHostsList()), clientVersion); + break; + default: + infoProtoList = null; + } + return DatanodeDiskBalancerInfoResponseProto.newBuilder() + .addAllInfo(infoProtoList) + .build(); } public GetMetricsResponseProto getMetrics(GetMetricsRequestProto request) throws IOException { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 2df2a4847e..6da9907db3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -1274,6 +1274,56 @@ public class SCMClientProtocolServer implements ContainerID.valueOf(startContainerID), count, state); } + public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport( + int count, int clientVersion) throws IOException { + // check admin authorisation + try { + getScm().checkAdminAccess(getRemoteUser(), true); + } catch (IOException e) { + LOG.error("Authorization failed", e); + throw e; + } + + return scm.getDiskBalancerManager().getDiskBalancerReport(count, + clientVersion); + } + + @Override + public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus( + Optional<List<String>> hosts, int clientVersion) throws IOException { + // check admin authorisation + try { + getScm().checkAdminAccess(getRemoteUser(), true); + } catch (IOException e) { + LOG.error("Authorization failed", e); + throw e; + } + + return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts, + clientVersion); + } + + @Override + public void startDiskBalancer(Optional<Double> threshold, + Optional<Double> bandwidth, Optional<List<String>> hosts) + throws IOException { + // TODO: Send message to datanodes + } + + @Override + public void stopDiskBalancer(Optional<List<String>> hosts) + throws IOException { + // TODO: Send message to datanodes + } + + + @Override + public void updateDiskBalancerConfiguration(Optional<Double> threshold, + Optional<Double> bandwidth, Optional<List<String>> hosts) + throws IOException { + // TODO: Send message to datanodes + } + /** * Queries a list of Node that match a set of statuses. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 11fdc0d16d..7b14cf2e1b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl; import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator; import org.apache.hadoop.hdds.scm.ScmInfo; +import org.apache.hadoop.hdds.scm.node.DiskBalancerManager; import org.apache.hadoop.hdds.scm.node.NodeAddressUpdateHandler; import org.apache.hadoop.hdds.scm.security.SecretKeyManagerService; import org.apache.hadoop.hdds.scm.security.RootCARotationManager; @@ -259,6 +260,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl private FinalizationManager finalizationManager; private HDDSLayoutVersionManager scmLayoutVersionManager; private LeaseManager<Object> leaseManager; + private DiskBalancerManager diskBalancerManager; private SCMMetadataStore scmMetadataStore; private CertificateStore certificateStore; @@ -854,6 +856,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl .setSCMDBTransactionBuffer(scmHAManager.getDBTransactionBuffer()) .setRatisServer(scmHAManager.getRatisServer()) .build(); + diskBalancerManager = new DiskBalancerManager(conf, eventQueue, scmContext, + scmNodeManager); } /** @@ -2007,6 +2011,10 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl return serviceManager; } + public DiskBalancerManager getDiskBalancerManager() { + return diskBalancerManager; + } + /** * Force SCM out of safe mode. */ diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java new file mode 100644 index 0000000000..541c9764b7 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.ozone.ClientVersion; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * Unit tests for the DiskBalancer manager. + */ + +public class TestDiskBalancerManager { + + private DiskBalancerManager diskBalancerManager; + private NodeManager nodeManager; + private OzoneConfiguration conf; + private String storageDir; + + @BeforeEach + public void setup() throws Exception { + conf = new OzoneConfiguration(); + storageDir = GenericTestUtils.getTempPath( + TestDiskBalancerManager.class.getSimpleName() + UUID.randomUUID()); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir); + nodeManager = new MockNodeManager(true, 3); + diskBalancerManager = new DiskBalancerManager(conf, new EventQueue(), + SCMContext.emptyContext(), nodeManager); + } + + @Test + public void testDatanodeDiskBalancerReport() throws IOException { + List<HddsProtos.DatanodeDiskBalancerInfoProto> reportProtoList = + diskBalancerManager.getDiskBalancerReport(2, + ClientVersion.CURRENT_VERSION); + + Assertions.assertEquals(2, reportProtoList.size()); + Assertions.assertTrue( + reportProtoList.get(0).getCurrentVolumeDensitySum() + >= reportProtoList.get(1).getCurrentVolumeDensitySum()); + } + + @Test + public void testDatanodeDiskBalancerStatus() throws IOException { + diskBalancerManager.addRunningDatanode(nodeManager.getAllNodes().get(0)); + + // Simulate users asking all status of 3 datanodes + List<String> dns = nodeManager.getAllNodes().stream().map( + DatanodeDetails::getIpAddress).collect( + Collectors.toList()); + + List<HddsProtos.DatanodeDiskBalancerInfoProto> statusProtoList = + diskBalancerManager.getDiskBalancerStatus(Optional.of(dns), + ClientVersion.CURRENT_VERSION); + + Assertions.assertEquals(3, statusProtoList.size()); + + // Simulate users asking status of 1 datanodes + dns = nodeManager.getAllNodes().stream().map( + DatanodeDetails::getIpAddress).limit(1).collect( + Collectors.toList()); + + statusProtoList = + diskBalancerManager.getDiskBalancerStatus(Optional.of(dns), + ClientVersion.CURRENT_VERSION); + + Assertions.assertEquals(1, statusProtoList.size()); + } +} diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java index 499d58b1ff..cb3fdb42f8 100644 --- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java @@ -568,4 +568,38 @@ public class ContainerOperationClient implements ScmClient { return storageContainerLocationClient.getMetrics(query); } + public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport( + int count) throws IOException { + return storageContainerLocationClient.getDiskBalancerReport(count, + ClientVersion.CURRENT_VERSION); + } + + @Override + public void startDiskBalancer(Optional<Double> threshold, + Optional<Double> bandwidth, Optional<List<String>> hosts) + throws IOException { + storageContainerLocationClient.startDiskBalancer(threshold, bandwidth, + hosts); + } + + @Override + public void stopDiskBalancer(Optional<List<String>> hosts) + throws IOException { + storageContainerLocationClient.stopDiskBalancer(hosts); + } + + @Override + public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus( + Optional<List<String>> hosts) throws IOException { + return storageContainerLocationClient.getDiskBalancerStatus(hosts, + ClientVersion.CURRENT_VERSION); + } + + @Override + public void updateDiskBalancerConfiguration(Optional<Double> threshold, + Optional<Double> bandwidth, Optional<List<String>> hosts) + throws IOException { + storageContainerLocationClient.updateDiskBalancerConfiguration(threshold, + bandwidth, hosts); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java new file mode 100644 index 0000000000..c047cd3ee0 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.scm.node; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity; +import org.apache.hadoop.hdds.scm.node.DatanodeInfo; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * This class tests disk balancer operations. + */ +@Timeout(300) +public class TestDiskBalancer { + + private static ScmClient storageClient; + private static MiniOzoneCluster cluster; + private static OzoneConfiguration ozoneConf; + + @BeforeAll + public static void setup() throws Exception { + ozoneConf = new OzoneConfiguration(); + ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, + SCMContainerPlacementCapacity.class, PlacementPolicy.class); + cluster = MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build(); + storageClient = new ContainerOperationClient(ozoneConf); + cluster.waitForClusterToBeReady(); + + for (DatanodeDetails dn: cluster.getStorageContainerManager() + .getScmNodeManager().getAllNodes()) { + ((DatanodeInfo) dn).updateStorageReports( + HddsTestUtils.getRandomNodeReport(3, 1).getStorageReportList()); + } + } + + @AfterAll + public static void cleanup() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testDatanodeDiskBalancerReport() throws IOException { + List<HddsProtos.DatanodeDiskBalancerInfoProto> reportProtoList = + storageClient.getDiskBalancerReport(2); + + assertEquals(2, reportProtoList.size()); + assertTrue( + reportProtoList.get(0).getCurrentVolumeDensitySum() + >= reportProtoList.get(1).getCurrentVolumeDensitySum()); + } + + @Test + public void testDatanodeDiskBalancerStatus() throws IOException { + // TODO: Test status command with datanodes in balancing + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
