This is an automated email from the ASF dual-hosted git repository. yiyang0203 pushed a commit to branch HDDS-5713 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 35775575f18e04735dcd36d2a74637fc9a6f2be5 Author: Symious <[email protected]> AuthorDate: Thu Feb 23 17:10:58 2023 +0800 HDDS-7383. Basic framework of DiskBalancerService (#3874) --- .../scm/storage/DiskBalancerConfiguration.java | 85 ++++- .../java/org/apache/hadoop/ozone/OzoneConsts.java | 3 + .../hadoop/hdds/fs/MockSpaceUsageCheckFactory.java | 16 + .../commandhandler/DiskBalancerCommandHandler.java | 39 +- .../container/diskbalancer/DiskBalancerInfo.java | 130 +++++++ .../diskbalancer/DiskBalancerService.java | 397 +++++++++++++++++++++ .../diskbalancer/DiskBalancerServiceMetrics.java | 138 +++++++ .../diskbalancer/DiskBalancerVersion.java | 79 ++++ .../container/diskbalancer/DiskBalancerYaml.java | 165 +++++++++ .../ozone/container/diskbalancer/package-info.java | 22 ++ .../ozone/container/ozoneimpl/OzoneContainer.java | 26 +- .../protocol/commands/DiskBalancerCommand.java | 25 +- .../diskbalancer/DiskBalancerServiceTestImpl.java | 104 ++++++ .../diskbalancer/TestDiskBalancerService.java | 173 +++++++++ .../diskbalancer/TestDiskBalancerYaml.java | 53 +++ ...inerLocationProtocolClientSideTranslatorPB.java | 7 +- .../src/main/proto/ScmAdminProtocol.proto | 8 +- .../interface-client/src/main/proto/hdds.proto | 6 + .../proto/ScmServerDatanodeHeartbeatProtocol.proto | 2 +- .../hadoop/hdds/scm/node/DiskBalancerManager.java | 141 ++++++++ ...inerLocationProtocolServerSideTranslatorPB.java | 6 +- .../hdds/scm/server/SCMClientProtocolServer.java | 32 +- .../cli/datanode/DiskBalancerStartSubcommand.java | 2 +- .../cli/datanode/DiskBalancerUpdateSubcommand.java | 2 +- 24 files changed, 1624 insertions(+), 37 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java index e460e0b6eb..ed40fa30f6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java @@ -28,6 +28,11 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; +import java.util.Optional; + +import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE; + /** * This class contains configuration values for the DiskBalancer. */ @@ -36,6 +41,17 @@ public final class DiskBalancerConfiguration { private static final Logger LOG = LoggerFactory.getLogger(DiskBalancerConfiguration.class); + @Config(key = "info.dir", type = ConfigType.STRING, + defaultValue = "", tags = {ConfigTag.DISKBALANCER}, + description = "The path where datanode diskBalancer's conf is to be " + + "written to. if this property is not defined, ozone will fall " + + "back to use metadata directory instead.") + private String infoDir; + + public String getDiskBalancerInfoDir() { + return infoDir; + } + @Config(key = "volume.density.threshold", type = ConfigType.DOUBLE, defaultValue = "10", tags = {ConfigTag.DISKBALANCER}, description = "Threshold is a percentage in the range of 0 to 100. A " + @@ -55,6 +71,72 @@ public final class DiskBalancerConfiguration { description = "The max parallel balance thread count.") private int parallelThread = 5; + @Config(key = "should.run.default", + defaultValue = "false", + type = ConfigType.BOOLEAN, + tags = { DATANODE, ConfigTag.DISKBALANCER}, + description = + "If DiskBalancer fails to get information from diskbalancer.info, " + + "it will choose this value to decide if this service should be " + + "running." + ) + private boolean diskBalancerShouldRun = false; + + public boolean getDiskBalancerShouldRun() { + return diskBalancerShouldRun; + } + + public void setDiskBalancerShouldRun(boolean shouldRun) { + this.diskBalancerShouldRun = shouldRun; + } + + @Config(key = "service.interval", + defaultValue = "60s", + type = ConfigType.TIME, + tags = { DATANODE, ConfigTag.DISKBALANCER}, + description = "Time interval of the Datanode DiskBalancer service. " + + "The Datanode will check the service periodically and update " + + "the config and running status for DiskBalancer service. " + + "Unit could be defined with postfix (ns,ms,s,m,h,d). " + ) + private long diskBalancerInterval = Duration.ofSeconds(60).toMillis(); + + public Duration getDiskBalancerInterval() { + return Duration.ofMillis(diskBalancerInterval); + } + + public void setDiskBalancerInterval(Duration duration) { + this.diskBalancerInterval = duration.toMillis(); + } + + @Config(key = "service.timeout", + defaultValue = "300s", + type = ConfigType.TIME, + tags = { DATANODE, ConfigTag.DISKBALANCER}, + description = "Timeout for the Datanode DiskBalancer service. " + + "Unit could be defined with postfix (ns,ms,s,m,h,d). " + ) + private long diskBalancerTimeout = Duration.ofSeconds(300).toMillis(); + + public Duration getDiskBalancerTimeout() { + return Duration.ofMillis(diskBalancerTimeout); + } + + public void setDiskBalancerTimeout(Duration duration) { + this.diskBalancerTimeout = duration.toMillis(); + } + + public DiskBalancerConfiguration() { + } + + public DiskBalancerConfiguration(Optional<Double> threshold, + Optional<Long> bandwidthInMB, + Optional<Integer> parallelThread) { + threshold.ifPresent(aDouble -> this.threshold = aDouble); + bandwidthInMB.ifPresent(aLong -> this.diskBandwidthInMB = aLong); + parallelThread.ifPresent(integer -> this.parallelThread = integer); + } + /** * Gets the threshold value for DiskBalancer. * @@ -86,7 +168,8 @@ public final class DiskBalancerConfiguration { * * @return max disk bandwidth per second */ - public double getDiskBandwidthInMB() { + + public long getDiskBandwidthInMB() { return diskBandwidthInMB; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index f3c08b252b..ad12bfa6ee 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -241,6 +241,9 @@ public final class OzoneConsts { */ public static final String OZONE_SCM_DATANODE_ID_FILE_DEFAULT = "datanode.id"; + public static final String + OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT = "diskBalancer.info"; + // The ServiceListJSONServlet context attribute where OzoneManager // instance gets stored. public static final String OM_CONTEXT_ATTRIBUTE = "ozone.om"; diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/MockSpaceUsageCheckFactory.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/MockSpaceUsageCheckFactory.java index 3b2e6415c2..eaeb3c1b4d 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/MockSpaceUsageCheckFactory.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/MockSpaceUsageCheckFactory.java @@ -50,6 +50,22 @@ public final class MockSpaceUsageCheckFactory { } } + /** + * An implementation that never checks space usage but reports basically + * 512G free space. Neither does it persist space usage info. + */ + public static class HalfTera implements SpaceUsageCheckFactory { + @Override + public SpaceUsageCheckParams paramsFor(File dir) { + return new SpaceUsageCheckParams(dir, + MockSpaceUsageSource.fixed(512L * 1024 * 1024 * 1024, + 512L * 1024 * 1024 * 1024), + Duration.ZERO, + SpaceUsagePersistence.None.INSTANCE + ); + } + } + private MockSpaceUsageCheckFactory() { throw new UnsupportedOperationException("no instances"); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java index 37f275dbb4..6ff745cafe 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java @@ -16,15 +16,21 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.DiskBalancerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + import java.util.concurrent.atomic.AtomicLong; /** @@ -56,7 +62,38 @@ public class DiskBalancerCommandHandler implements CommandHandler { public void handle(SCMCommand command, OzoneContainer ozoneContainer, StateContext context, SCMConnectionManager connectionManager) { invocationCount.incrementAndGet(); - // TODO: Do start/stop/update operation + final long startTime = Time.monotonicNow(); + DiskBalancerCommand diskBalancerCommand = (DiskBalancerCommand) command; + + final HddsProtos.DiskBalancerOpType opType = + diskBalancerCommand.getOpType(); + final DiskBalancerConfiguration diskBalancerConf = + diskBalancerCommand.getDiskBalancerConfiguration(); + + DiskBalancerInfo diskBalancerInfo = ozoneContainer.getDiskBalancerInfo(); + + try { + switch (opType) { + case START: + diskBalancerInfo.setShouldRun(true); + diskBalancerInfo.updateFromConf(diskBalancerConf); + break; + case STOP: + diskBalancerInfo.setShouldRun(false); + break; + case UPDATE: + diskBalancerInfo.updateFromConf(diskBalancerConf); + break; + default: + throw new IOException("Unexpected type " + opType); + } + ozoneContainer.getDiskBalancerService().refresh(diskBalancerInfo); + } catch (IOException e) { + LOG.error("Can't handle command type: {}", opType, e); + } finally { + long endTime = Time.monotonicNow(); + totalTime += endTime - startTime; + } } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java new file mode 100644 index 0000000000..873c89aad1 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.diskbalancer; + +import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; + +import java.util.Objects; + +/** + * DiskBalancer's information to persist. + */ +public class DiskBalancerInfo { + private boolean shouldRun; + private double threshold; + private long bandwidthInMB; + private int parallelThread; + private DiskBalancerVersion version; + + public DiskBalancerInfo(boolean shouldRun, double threshold, + long bandwidthInMB, int parallelThread) { + this(shouldRun, threshold, bandwidthInMB, parallelThread, + DiskBalancerVersion.DEFAULT_VERSION); + } + + public DiskBalancerInfo(boolean shouldRun, double threshold, + long bandwidthInMB, int parallelThread, DiskBalancerVersion version) { + this.shouldRun = shouldRun; + this.threshold = threshold; + this.bandwidthInMB = bandwidthInMB; + this.parallelThread = parallelThread; + this.version = version; + } + + public DiskBalancerInfo(boolean shouldRun, + DiskBalancerConfiguration diskBalancerConf) { + this.shouldRun = shouldRun; + this.threshold = diskBalancerConf.getThreshold(); + this.bandwidthInMB = diskBalancerConf.getDiskBandwidthInMB(); + this.parallelThread = diskBalancerConf.getParallelThread(); + this.version = DiskBalancerVersion.DEFAULT_VERSION; + } + + public void updateFromConf(DiskBalancerConfiguration diskBalancerConf) { + if (threshold != diskBalancerConf.getThreshold()) { + setThreshold(diskBalancerConf.getThreshold()); + } + if (bandwidthInMB != diskBalancerConf.getDiskBandwidthInMB()) { + setBandwidthInMB(diskBalancerConf.getDiskBandwidthInMB()); + } + if (parallelThread != diskBalancerConf.getParallelThread()) { + setParallelThread(diskBalancerConf.getParallelThread()); + } + } + + public boolean isShouldRun() { + return shouldRun; + } + + public void setShouldRun(boolean shouldRun) { + this.shouldRun = shouldRun; + } + + public double getThreshold() { + return threshold; + } + + public void setThreshold(double threshold) { + this.threshold = threshold; + } + + public long getBandwidthInMB() { + return bandwidthInMB; + } + + public void setBandwidthInMB(long bandwidthInMB) { + this.bandwidthInMB = bandwidthInMB; + } + + public int getParallelThread() { + return parallelThread; + } + + public void setParallelThread(int parallelThread) { + this.parallelThread = parallelThread; + } + + public DiskBalancerVersion getVersion() { + return version; + }; + + public void setVersion(DiskBalancerVersion version) { + this.version = version; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DiskBalancerInfo that = (DiskBalancerInfo) o; + return shouldRun == that.shouldRun && + Double.compare(that.threshold, threshold) == 0 && + bandwidthInMB == that.bandwidthInMB && + parallelThread == that.parallelThread && + version == that.version; + } + + @Override + public int hashCode() { + return Objects.hash(shouldRun, threshold, bandwidthInMB, parallelThread, + version); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java new file mode 100644 index 0000000000..75e1660029 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.diskbalancer; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto; +import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; +import org.apache.hadoop.hdds.server.ServerUtils; +import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.BackgroundTask; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.util.Time; +import org.apache.ratis.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A per-datanode disk balancing service takes in charge + * of moving contains among disks. + */ +public class DiskBalancerService extends BackgroundService { + + private static final Logger LOG = + LoggerFactory.getLogger(DiskBalancerService.class); + + public static final String DISK_BALANCER_DIR = "diskBalancer"; + + private static final String DISK_BALANCER_TMP_DIR = "tmp"; + + private OzoneContainer ozoneContainer; + private final ConfigurationSource conf; + + private boolean shouldRun = false; + private double threshold; + private long bandwidthInMB; + private int parallelThread; + + private DiskBalancerVersion version; + + private AtomicLong totalBalancedBytes = new AtomicLong(0L); + private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L); + private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow()); + + private Map<DiskBalancerTask, Integer> inProgressTasks; + private Set<Long> inProgressContainers; + + // Every time a container is decided to be moved from Vol A to Vol B, + // the size will be deducted from Vol A and added to Vol B. + // This map is used to help calculate the expected storage size after + // the container balancing finished. + private Map<HddsVolume, Long> deltaSizes; + private MutableVolumeSet volumeSet; + + private final File diskBalancerInfoFile; + + private DiskBalancerServiceMetrics metrics; + + public DiskBalancerService(OzoneContainer ozoneContainer, + long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit, + int workerSize, ConfigurationSource conf) throws IOException { + super("DiskBalancerService", serviceCheckInterval, timeUnit, workerSize, + serviceCheckTimeout); + this.ozoneContainer = ozoneContainer; + this.conf = conf; + + String diskBalancerInfoPath = getDiskBalancerInfoPath(); + Preconditions.checkNotNull(diskBalancerInfoPath); + diskBalancerInfoFile = new File(diskBalancerInfoPath); + + inProgressTasks = new ConcurrentHashMap<>(); + inProgressContainers = ConcurrentHashMap.newKeySet(); + deltaSizes = new ConcurrentHashMap<>(); + volumeSet = ozoneContainer.getVolumeSet(); + + metrics = DiskBalancerServiceMetrics.create(); + + loadDiskBalancerInfo(); + + constructTmpDir(); + } + + /** + * Update DiskBalancerService based on new DiskBalancerInfo. + * @param diskBalancerInfo + * @throws IOException + */ + public void refresh(DiskBalancerInfo diskBalancerInfo) throws IOException { + applyDiskBalancerInfo(diskBalancerInfo); + } + + private void constructTmpDir() throws IOException { + for (HddsVolume volume: + StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) { + Path tmpDir = getDiskBalancerTmpDir(volume); + try { + FileUtils.deleteFully(tmpDir); + FileUtils.createDirectories(tmpDir); + } catch (IOException ex) { + LOG.warn("Can not reconstruct tmp directory under volume {}", volume, + ex); + throw ex; + } + } + } + + /** + * If the diskBalancer.info file exists, load the file. If not exists, + * return the default config. + * @throws IOException + */ + private void loadDiskBalancerInfo() throws IOException { + DiskBalancerInfo diskBalancerInfo; + try { + if (diskBalancerInfoFile.exists()) { + diskBalancerInfo = readDiskBalancerInfoFile(diskBalancerInfoFile); + } else { + boolean shouldRunDefault = + conf.getObject(DiskBalancerConfiguration.class) + .getDiskBalancerShouldRun(); + diskBalancerInfo = new DiskBalancerInfo(shouldRunDefault, + new DiskBalancerConfiguration()); + } + } catch (IOException e) { + LOG.warn("Can not load diskBalancerInfo from diskBalancer.info file. " + + "Falling back to default configs", e); + throw e; + } + + applyDiskBalancerInfo(diskBalancerInfo); + } + + private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo) + throws IOException { + // First store in local file, then update in memory variables + writeDiskBalancerInfoTo(diskBalancerInfo, diskBalancerInfoFile); + + setShouldRun(diskBalancerInfo.isShouldRun()); + setThreshold(diskBalancerInfo.getThreshold()); + setBandwidthInMB(diskBalancerInfo.getBandwidthInMB()); + setParallelThread(diskBalancerInfo.getParallelThread()); + setVersion(diskBalancerInfo.getVersion()); + + // Default executorService is ScheduledThreadPoolExecutor, so we can + // update the poll size by setting corePoolSize. + if ((getExecutorService() instanceof ScheduledThreadPoolExecutor)) { + ((ScheduledThreadPoolExecutor) getExecutorService()) + .setCorePoolSize(parallelThread); + } + } + + private String getDiskBalancerInfoPath() { + String diskBalancerInfoDir = + conf.getObject(DiskBalancerConfiguration.class) + .getDiskBalancerInfoDir(); + if (Strings.isNullOrEmpty(diskBalancerInfoDir)) { + File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf); + if (metaDirPath == null) { + // this means meta data is not found, in theory should not happen at + // this point because should've failed earlier. + throw new IllegalArgumentException("Unable to locate meta data" + + "directory when getting datanode disk balancer file path"); + } + diskBalancerInfoDir = metaDirPath.toString(); + } + // Use default datanode disk balancer file name for file path + return new File(diskBalancerInfoDir, + OzoneConsts.OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT) + .toString(); + } + + /** + * Read {@link DiskBalancerInfo} from a local info file. + * + * @param path DiskBalancerInfo file local path + * @return {@link DatanodeDetails} + * @throws IOException If the conf file is malformed or other I/O exceptions + */ + private synchronized DiskBalancerInfo readDiskBalancerInfoFile( + File path) throws IOException { + if (!path.exists()) { + throw new IOException("DiskBalancerConf file not found."); + } + try { + return DiskBalancerYaml.readDiskBalancerInfoFile(path); + } catch (IOException e) { + LOG.warn("Error loading DiskBalancerInfo yaml from {}", + path.getAbsolutePath(), e); + throw new IOException("Failed to parse DiskBalancerInfo from " + + path.getAbsolutePath(), e); + } + } + + /** + * Persistent a {@link DiskBalancerInfo} to a local file. + * + * @throws IOException when read/write error occurs + */ + private synchronized void writeDiskBalancerInfoTo( + DiskBalancerInfo diskBalancerInfo, File path) + throws IOException { + if (path.exists()) { + if (!path.delete() || !path.createNewFile()) { + throw new IOException("Unable to overwrite the DiskBalancerInfo file."); + } + } else { + if (!path.getParentFile().exists() && + !path.getParentFile().mkdirs()) { + throw new IOException("Unable to create DiskBalancerInfo directories."); + } + } + DiskBalancerYaml.createDiskBalancerInfoFile(diskBalancerInfo, path); + } + + + + public void setShouldRun(boolean shouldRun) { + this.shouldRun = shouldRun; + } + + public void setThreshold(double threshold) { + this.threshold = threshold; + } + + public void setBandwidthInMB(long bandwidthInMB) { + this.bandwidthInMB = bandwidthInMB; + } + + public void setParallelThread(int parallelThread) { + this.parallelThread = parallelThread; + } + + public void setVersion(DiskBalancerVersion version) { + this.version = version; + } + + public DiskBalancerInfo getDiskBalancerInfo() { + return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB, + parallelThread, version); + } + + public DiskBalancerReportProto getDiskBalancerReportProto() { + DiskBalancerReportProto.Builder builder = + DiskBalancerReportProto.newBuilder(); + return builder.setIsRunning(shouldRun) + .setBalancedBytes(totalBalancedBytes.get()) + .setDiskBalancerConf( + HddsProtos.DiskBalancerConfigurationProto.newBuilder() + .setThreshold(threshold) + .setDiskBandwidthInMB(bandwidthInMB) + .setParallelThread(parallelThread) + .build()) + .build(); + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + + if (!shouldRun) { + return queue; + } + metrics.incrRunningLoopCount(); + + if (shouldDelay()) { + metrics.incrIdleLoopExceedsBandwidthCount(); + return queue; + } + + int availableTaskCount = parallelThread - inProgressTasks.size(); + if (availableTaskCount <= 0) { + LOG.info("No available thread for disk balancer service. " + + "Current thread count is {}.", parallelThread); + return queue; + } + + // TODO: Implementation for choose tasks + + if (queue.isEmpty()) { + metrics.incrIdleLoopNoAvailableVolumePairCount(); + } + return queue; + } + + private boolean shouldDelay() { + // We should wait for next AvailableTime. + if (Time.monotonicNow() <= nextAvailableTime.get()) { + return true; + } + // Calculate the next AvailableTime based on bandwidth + long bytesBalanced = balancedBytesInLastWindow.getAndSet(0L); + + final int megaByte = 1024 * 1024; + + // converting disk bandwidth in byte/millisec + float bytesPerMillisec = bandwidthInMB * megaByte / 1000f; + nextAvailableTime.set(Time.monotonicNow() + + ((long) (bytesBalanced / bytesPerMillisec))); + return false; + } + + private class DiskBalancerTask implements BackgroundTask { + + private HddsVolume sourceVolume; + private HddsVolume destVolume; + private ContainerData containerData; + + DiskBalancerTask(ContainerData containerData, + HddsVolume sourceVolume, HddsVolume destVolume) { + this.containerData = containerData; + this.sourceVolume = sourceVolume; + this.destVolume = destVolume; + } + + @Override + public BackgroundTaskResult call() { + // TODO: Details of handling tasks + return BackgroundTaskResult.EmptyTaskResult.newResult(); + } + + @Override + public int getPriority() { + return BackgroundTask.super.getPriority(); + } + + private void postCall() { + inProgressContainers.remove(containerData.getContainerID()); + deltaSizes.put(sourceVolume, deltaSizes.get(sourceVolume) + + containerData.getBytesUsed()); + deltaSizes.put(destVolume, deltaSizes.get(destVolume) + - containerData.getBytesUsed()); + } + } + + private Path getDiskBalancerTmpDir(HddsVolume hddsVolume) { + return Paths.get(hddsVolume.getVolumeRootDir()) + .resolve(DISK_BALANCER_TMP_DIR).resolve(DISK_BALANCER_DIR); + } + + public boolean isBalancingContainer(long containerId) { + return inProgressContainers.contains(containerId); + } + + public DiskBalancerServiceMetrics getMetrics() { + return metrics; + } + + @VisibleForTesting + public void setBalancedBytesInLastWindow(long bytes) { + this.balancedBytesInLastWindow.set(bytes); + } + + @Override + public void shutdown() { + super.shutdown(); + if (metrics != null) { + DiskBalancerServiceMetrics.unRegister(); + } + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java new file mode 100644 index 0000000000..3cf087b334 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.diskbalancer; + +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; + +/** + * Metrics related to DiskBalancer Service running on Datanode. + */ +@Metrics(name = "DiskBalancerService Metrics", about = "Metrics related to " + + "background disk balancer service on Datanode", context = "dfs") +public final class DiskBalancerServiceMetrics { + + private static DiskBalancerServiceMetrics instance; + public static final String SOURCE_NAME = + DiskBalancerServiceMetrics.class.getSimpleName(); + + @Metric(about = "The number of successful balance job") + private MutableCounterLong successCount; + + @Metric(about = "The total bytes for successfully balanced job.") + private MutableCounterLong successBytes; + + @Metric(about = "The number of failed balance job.") + private MutableCounterLong failureCount; + + @Metric(about = "The number of total running loop") + private MutableCounterLong runningLoopCount; + + @Metric(about = "The number of idle loop that can not generate volume pair.") + private MutableCounterLong idleLoopNoAvailableVolumePairCount; + + @Metric(about = "The number of idle loop due to bandwidth limits.") + private MutableCounterLong idleLoopExceedsBandwidthCount; + + private DiskBalancerServiceMetrics() { + } + + public static DiskBalancerServiceMetrics create() { + if (instance == null) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + instance = ms.register(SOURCE_NAME, "DiskBalancerService", + new DiskBalancerServiceMetrics()); + } + + return instance; + } + + /** + * Unregister the metrics instance. + */ + public static void unRegister() { + instance = null; + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(SOURCE_NAME); + } + + public void incrSuccessCount(long count) { + this.successCount.incr(count); + } + + public void incrSuccessBytes(long bytes) { + this.successBytes.incr(bytes); + } + + public void incrFailureCount() { + this.failureCount.incr(); + } + + public void incrRunningLoopCount() { + this.runningLoopCount.incr(); + } + + public void incrIdleLoopNoAvailableVolumePairCount() { + this.idleLoopNoAvailableVolumePairCount.incr(); + } + + public void incrIdleLoopExceedsBandwidthCount() { + this.idleLoopExceedsBandwidthCount.incr(); + } + + public long getSuccessCount() { + return successCount.value(); + } + + public long getSuccessBytes() { + return successBytes.value(); + } + + public long getFailureCount() { + return failureCount.value(); + } + + public long getRunningLoopCount() { + return runningLoopCount.value(); + } + + public long getIdleLoopNoAvailableVolumePairCount() { + return idleLoopNoAvailableVolumePairCount.value(); + } + + public long getIdleLoopExceedsBandwidthCount() { + return idleLoopExceedsBandwidthCount.value(); + } + + @Override + public String toString() { + StringBuffer buffer = new StringBuffer(); + buffer.append("successCount = " + successCount.value()).append("\t") + .append("successBytes = " + successBytes.value()).append("\t") + .append("failureCount = " + failureCount.value()).append("\t") + .append("idleLoopNoAvailableVolumePairCount = " + + idleLoopNoAvailableVolumePairCount.value()).append("\t") + .append("idleLoopExceedsBandwidthCount = " + + idleLoopExceedsBandwidthCount.value()); + return buffer.toString(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerVersion.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerVersion.java new file mode 100644 index 0000000000..28da8993c1 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerVersion.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.diskbalancer; + +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * Defines versions for the DiskBalancerService. + */ +public enum DiskBalancerVersion { + ONE(1, "First Version") { + }; + + private final int version; + private final String description; + + public static final DiskBalancerVersion + DEFAULT_VERSION = DiskBalancerVersion.ONE; + + private static final List<DiskBalancerVersion> DISK_BALANCER_VERSIONS = + ImmutableList.copyOf(values()); + + DiskBalancerVersion(int version, String description) { + this.version = version; + this.description = description; + } + + public static DiskBalancerVersion getDiskBalancerVersion(int version) { + for (DiskBalancerVersion diskBalancerVersion : + DISK_BALANCER_VERSIONS) { + if (diskBalancerVersion.getVersion() == version) { + return diskBalancerVersion; + } + } + return null; + } + + public static DiskBalancerVersion getDiskBalancerVersion(String versionStr) { + for (DiskBalancerVersion diskBalancerVersion : + DISK_BALANCER_VERSIONS) { + if (diskBalancerVersion.toString().equalsIgnoreCase(versionStr)) { + return diskBalancerVersion; + } + } + return null; + } + + /** + * @return version number. + */ + public int getVersion() { + return version; + } + + /** + * @return description. + */ + public String getDescription() { + return description; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java new file mode 100644 index 0000000000..d16eb65747 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.diskbalancer; + +import org.yaml.snakeyaml.DumperOptions; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; + +/** + * Class for creating diskbalancer.info file in yaml format. + */ + +public final class DiskBalancerYaml { + + private DiskBalancerYaml() { + // static helper methods only, no state. + } + + /** + * Creates a yaml file to store DiskBalancer info. + * + * @param diskBalancerInfo {@link DiskBalancerInfo} + * @param path Path to diskBalancer.info file + */ + public static void createDiskBalancerInfoFile( + DiskBalancerInfo diskBalancerInfo, File path) + throws IOException { + DumperOptions options = new DumperOptions(); + options.setPrettyFlow(true); + options.setDefaultFlowStyle(DumperOptions.FlowStyle.FLOW); + Yaml yaml = new Yaml(options); + + try (Writer writer = new OutputStreamWriter( + new FileOutputStream(path), StandardCharsets.UTF_8)) { + yaml.dump(getDiskBalancerInfoYaml(diskBalancerInfo), writer); + } + } + + /** + * Read DiskBalancerConfiguration from file. + */ + public static DiskBalancerInfo readDiskBalancerInfoFile(File path) + throws IOException { + DiskBalancerInfo diskBalancerInfo; + + try (FileInputStream inputFileStream = new FileInputStream(path)) { + Yaml yaml = new Yaml(); + DiskBalancerInfoYaml diskBalancerInfoYaml; + try { + diskBalancerInfoYaml = + yaml.loadAs(inputFileStream, DiskBalancerInfoYaml.class); + } catch (Exception e) { + throw new IOException("Unable to parse yaml file.", e); + } + + diskBalancerInfo = new DiskBalancerInfo( + diskBalancerInfoYaml.isShouldRun(), + diskBalancerInfoYaml.getThreshold(), + diskBalancerInfoYaml.getBandwidthInMB(), + diskBalancerInfoYaml.getParallelThread(), + DiskBalancerVersion.getDiskBalancerVersion( + diskBalancerInfoYaml.version)); + } + + return diskBalancerInfo; + } + + /** + * Datanode DiskBalancer Info to be written to the yaml file. + */ + public static class DiskBalancerInfoYaml { + private boolean shouldRun; + private double threshold; + private long bandwidthInMB; + private int parallelThread; + + private int version; + + public DiskBalancerInfoYaml() { + // Needed for snake-yaml introspection. + } + + private DiskBalancerInfoYaml(boolean shouldRun, double threshold, + long bandwidthInMB, int parallelThread, int version) { + this.shouldRun = shouldRun; + this.threshold = threshold; + this.bandwidthInMB = bandwidthInMB; + this.parallelThread = parallelThread; + this.version = version; + } + + public boolean isShouldRun() { + return shouldRun; + } + + public void setShouldRun(boolean shouldRun) { + this.shouldRun = shouldRun; + } + + public void setThreshold(double threshold) { + this.threshold = threshold; + } + + public double getThreshold() { + return threshold; + } + + public void setBandwidthInMB(long bandwidthInMB) { + this.bandwidthInMB = bandwidthInMB; + } + + public long getBandwidthInMB() { + return this.bandwidthInMB; + } + + public void setParallelThread(int parallelThread) { + this.parallelThread = parallelThread; + } + + public int getParallelThread() { + return this.parallelThread; + } + + public void setVersion(int version) { + this.version = version; + } + + public int getVersion() { + return this.version; + } + } + + private static DiskBalancerInfoYaml getDiskBalancerInfoYaml( + DiskBalancerInfo diskBalancerInfo) { + + return new DiskBalancerInfoYaml( + diskBalancerInfo.isShouldRun(), + diskBalancerInfo.getThreshold(), + diskBalancerInfo.getBandwidthInMB(), + diskBalancerInfo.getParallelThread(), + diskBalancerInfo.getVersion().getVersion()); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/package-info.java new file mode 100644 index 0000000000..d8ba71eb9d --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + This package contains classes related to the DiskBalancer service. + */ +package org.apache.hadoop.ozone.container.diskbalancer; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index f81b58e317..a79a160e81 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.security.SecurityConfig; import org.apache.hadoop.hdds.security.symmetric.SecretKeyVerifierClient; +import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; import org.apache.hadoop.hdds.security.token.TokenVerifier; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.utils.HddsServerUtil; @@ -56,6 +57,8 @@ import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.StorageVolume.VolumeType; import org.apache.hadoop.ozone.container.common.volume.StorageVolumeChecker; +import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo; +import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService; import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.StaleRecoveringContainerScrubbingService; import org.apache.hadoop.ozone.container.replication.ContainerImporter; import org.apache.hadoop.ozone.container.replication.ReplicationServer; @@ -118,6 +121,7 @@ public class OzoneContainer { private final StaleRecoveringContainerScrubbingService recoveringContainerScrubbingService; private final GrpcTlsConfig tlsClientConfig; + private final DiskBalancerService diskBalancerService; private final AtomicReference<InitializingStatus> initializingStatus; private final ReplicationServer replicationServer; private DatanodeDetails datanodeDetails; @@ -239,6 +243,15 @@ public class OzoneContainer { datanodeDetails.threadNamePrefix(), context.getParent().getReconfigurationHandler()); + Duration diskBalancerSvcInterval = conf.getObject( + DiskBalancerConfiguration.class).getDiskBalancerInterval(); + Duration diskBalancerSvcTimeout = conf.getObject( + DiskBalancerConfiguration.class).getDiskBalancerTimeout(); + diskBalancerService = + new DiskBalancerService(this, diskBalancerSvcInterval.toMillis(), + diskBalancerSvcTimeout.toMillis(), TimeUnit.MILLISECONDS, 1, + config); + Duration recoveringContainerScrubbingSvcInterval = conf.getObject( DatanodeConfiguration.class).getRecoveringContainerScrubInterval(); @@ -468,6 +481,7 @@ public class OzoneContainer { hddsDispatcher.init(); hddsDispatcher.setClusterId(clusterId); blockDeletingService.start(); + diskBalancerService.start(); recoveringContainerScrubbingService.start(); // mark OzoneContainer as INITIALIZED. @@ -493,6 +507,7 @@ public class OzoneContainer { dbVolumeSet.shutdown(); } blockDeletingService.shutdown(); + diskBalancerService.shutdown(); recoveringContainerScrubbingService.shutdown(); ContainerMetrics.remove(); } @@ -592,7 +607,14 @@ public class OzoneContainer { } public DiskBalancerReportProto getDiskBalancerReport() { - // TODO: Return real disk balancer report - return null; + return diskBalancerService.getDiskBalancerReportProto(); + } + + public DiskBalancerInfo getDiskBalancerInfo() { + return diskBalancerService.getDiskBalancerInfo(); + } + + public DiskBalancerService getDiskBalancerService() { + return diskBalancerService; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java index ea97f34ca0..1780877787 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.protocol.commands; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; @@ -29,12 +30,12 @@ import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt */ public class DiskBalancerCommand extends SCMCommand<DiskBalancerCommandProto> { - private final boolean shouldRun; + private final HddsProtos.DiskBalancerOpType opType; private final DiskBalancerConfiguration diskBalancerConfiguration; - public DiskBalancerCommand(final boolean shouldRun, + public DiskBalancerCommand(final HddsProtos.DiskBalancerOpType opType, final DiskBalancerConfiguration diskBalancerConfiguration) { - this.shouldRun = shouldRun; + this.opType = opType; this.diskBalancerConfiguration = diskBalancerConfiguration; } @@ -50,22 +51,26 @@ public class DiskBalancerCommand extends SCMCommand<DiskBalancerCommandProto> { @Override public DiskBalancerCommandProto getProto() { - return DiskBalancerCommandProto.newBuilder() - .setShouldRun(shouldRun) - .setDiskBalancerConf(diskBalancerConfiguration.toProtobufBuilder()) - .build(); + DiskBalancerCommandProto.Builder builder = DiskBalancerCommandProto + .newBuilder().setOpType(opType); + // Stop command don't have diskBalancerConf + if (diskBalancerConfiguration != null) { + builder.setDiskBalancerConf( + diskBalancerConfiguration.toProtobufBuilder()); + } + return builder.build(); } public static DiskBalancerCommand getFromProtobuf(DiskBalancerCommandProto diskbalancerCommandProto, ConfigurationSource configuration) { Preconditions.checkNotNull(diskbalancerCommandProto); - return new DiskBalancerCommand(diskbalancerCommandProto.getShouldRun(), + return new DiskBalancerCommand(diskbalancerCommandProto.getOpType(), DiskBalancerConfiguration.fromProtobuf( diskbalancerCommandProto.getDiskBalancerConf(), configuration)); } - public boolean isShouldRun() { - return shouldRun; + public HddsProtos.DiskBalancerOpType getOpType() { + return opType; } public DiskBalancerConfiguration getDiskBalancerConfiguration() { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java new file mode 100644 index 0000000000..0d87ac3420 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.diskbalancer; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A test class implementation for {@link DiskBalancerService}. + */ +public class DiskBalancerServiceTestImpl extends DiskBalancerService { + + // the service timeout + private static final int SERVICE_TIMEOUT_IN_MILLISECONDS = 0; + + // tests only + private CountDownLatch latch; + private Thread testingThread; + private AtomicInteger numOfProcessed = new AtomicInteger(0); + + public DiskBalancerServiceTestImpl(OzoneContainer container, + int serviceInterval, ConfigurationSource conf, int threadCount) + throws IOException { + super(container, serviceInterval, SERVICE_TIMEOUT_IN_MILLISECONDS, + TimeUnit.MILLISECONDS, threadCount, conf); + } + + public void runBalanceTasks() { + if (latch.getCount() > 0) { + this.latch.countDown(); + } else { + throw new IllegalStateException("Count already reaches zero"); + } + } + + public boolean isStarted() { + return latch != null && testingThread.isAlive(); + } + + public int getTimesOfProcessed() { + return numOfProcessed.get(); + } + + // Override the implementation to start a single on-call control thread. + @Override + public void start() { + PeriodicalTask svc = new PeriodicalTask(); + // In test mode, relies on a latch countdown to runDeletingTasks tasks. + Runnable r = () -> { + while (true) { + latch = new CountDownLatch(1); + try { + latch.await(); + } catch (InterruptedException e) { + break; + } + Future<?> future = this.getExecutorService().submit(svc); + try { + // for tests, we only wait for 3s for completion + future.get(3000, TimeUnit.SECONDS); + numOfProcessed.incrementAndGet(); + } catch (Exception e) { + e.printStackTrace(); + return; + } + } + }; + + testingThread = new ThreadFactoryBuilder() + .setDaemon(true) + .build() + .newThread(r); + testingThread.start(); + } + + @Override + public void shutdown() { + testingThread.interrupt(); + super.shutdown(); + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java new file mode 100644 index 0000000000..c03f8cab75 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.diskbalancer; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.container.common.TestBlockDeletingService; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; +import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.common.volume.StorageVolume; +import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; +import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Timeout; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * This is a test class for DiskBalancerService. + */ +public class TestDiskBalancerService { + private File testRoot; + private String scmId; + private String datanodeUuid; + private OzoneConfiguration conf; + + private final ContainerLayoutVersion layout; + private final String schemaVersion; + private MutableVolumeSet volumeSet; + + public TestDiskBalancerService(ContainerTestVersionInfo versionInfo) { + this.layout = versionInfo.getLayout(); + this.schemaVersion = versionInfo.getSchemaVersion(); + conf = new OzoneConfiguration(); + ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf); + } + + + @BeforeEach + public void init() throws IOException { + testRoot = GenericTestUtils + .getTestDir(TestBlockDeletingService.class.getSimpleName()); + if (testRoot.exists()) { + FileUtils.cleanDirectory(testRoot); + } + scmId = UUID.randomUUID().toString(); + conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, + generateVolumeLocation(testRoot.getAbsolutePath(), 2)); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testRoot.getAbsolutePath()); + conf.set("hdds.datanode.du.factory.classname", + "org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory$HalfTera"); + datanodeUuid = UUID.randomUUID().toString(); + volumeSet = new MutableVolumeSet(datanodeUuid, scmId, conf, null, + StorageVolume.VolumeType.DATA_VOLUME, null); + createDbInstancesForTestIfNeeded(volumeSet, scmId, scmId, conf); + } + + @AfterEach + public void cleanup() throws IOException { + BlockUtils.shutdownCache(conf); + FileUtils.deleteDirectory(testRoot); + } + + @Timeout(30) + @ContainerTestVersionInfo.ContainerTest + public void testUpdateService() throws Exception { + // Increase volume's usedBytes + for (StorageVolume volume : volumeSet.getVolumeMap().values()) { + volume.incrementUsedSpace(volume.getCapacity() / 2); + } + + ContainerSet containerSet = new ContainerSet(1000); + ContainerMetrics metrics = ContainerMetrics.create(conf); + KeyValueHandler keyValueHandler = + new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, + metrics, c -> { + }); + DiskBalancerServiceTestImpl svc = + getDiskBalancerService(containerSet, conf, keyValueHandler, null, 1); + + // Set a low bandwidth to delay job + svc.setShouldRun(true); + svc.setThreshold(10.0d); + svc.setBandwidthInMB(1L); + svc.setParallelThread(5); + svc.setVersion(DiskBalancerVersion.DEFAULT_VERSION); + + svc.start(); + + assertTrue(svc.getDiskBalancerInfo().isShouldRun()); + assertEquals(10, svc.getDiskBalancerInfo().getThreshold(), 0.0); + assertEquals(1, svc.getDiskBalancerInfo().getBandwidthInMB()); + assertEquals(5, svc.getDiskBalancerInfo().getParallelThread()); + + DiskBalancerInfo newInfo = new DiskBalancerInfo(false, 20.0d, 5L, 10); + svc.refresh(newInfo); + + assertFalse(svc.getDiskBalancerInfo().isShouldRun()); + assertEquals(20, svc.getDiskBalancerInfo().getThreshold(), 0.0); + assertEquals(5, svc.getDiskBalancerInfo().getBandwidthInMB()); + assertEquals(10, svc.getDiskBalancerInfo().getParallelThread()); + + svc.shutdown(); + } + + private String generateVolumeLocation(String base, int volumeCount) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < volumeCount; i++) { + sb.append(base + "/vol" + i); + sb.append(","); + } + return sb.substring(0, sb.length() - 1); + } + + private DiskBalancerServiceTestImpl getDiskBalancerService( + ContainerSet containerSet, ConfigurationSource config, + KeyValueHandler keyValueHandler, ContainerController controller, + int threadCount) throws IOException { + OzoneContainer ozoneContainer = + mockDependencies(containerSet, keyValueHandler, controller); + return new DiskBalancerServiceTestImpl(ozoneContainer, 1000, config, + threadCount); + } + + private OzoneContainer mockDependencies(ContainerSet containerSet, + KeyValueHandler keyValueHandler, ContainerController controller) { + OzoneContainer ozoneContainer = mock(OzoneContainer.class); + when(ozoneContainer.getContainerSet()).thenReturn(containerSet); + when(ozoneContainer.getWriteChannel()).thenReturn(null); + ContainerDispatcher dispatcher = mock(ContainerDispatcher.class); + when(ozoneContainer.getDispatcher()).thenReturn(dispatcher); + when(dispatcher.getHandler(any())).thenReturn(keyValueHandler); + when(ozoneContainer.getVolumeSet()).thenReturn(volumeSet); + when(ozoneContainer.getController()).thenReturn(controller); + return ozoneContainer; + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java new file mode 100644 index 0000000000..3b61255b0b --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.diskbalancer; + +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; + +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT; + +/** + * Tests to test DiskBalancer's YAML operation. + */ +public class TestDiskBalancerYaml { + @Test + public void testCreateYaml() throws IOException { + boolean shouldRun = true; + double threshold = 10; + long bandwidthInMB = 100; + int parallelThread = 5; + DiskBalancerVersion version = DiskBalancerVersion.DEFAULT_VERSION; + + File file = new File(GenericTestUtils.getTestDir(), + OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT); + + DiskBalancerInfo info = new DiskBalancerInfo(shouldRun, threshold, + bandwidthInMB, parallelThread, version); + + DiskBalancerYaml.createDiskBalancerInfoFile(info, file); + + DiskBalancerInfo newInfo = DiskBalancerYaml.readDiskBalancerInfoFile(file); + + Assertions.assertEquals(info, newInfo); + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 91e6f5578d..c49062a410 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoType; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpType; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto; @@ -1168,7 +1167,7 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB DatanodeDiskBalancerOpRequestProto.Builder requestBuilder = DatanodeDiskBalancerOpRequestProto.newBuilder() - .setOpType(DatanodeDiskBalancerOpType.start) + .setOpType(HddsProtos.DiskBalancerOpType.START) .setConf(confBuilder); hosts.ifPresent(requestBuilder::addAllHosts); @@ -1190,7 +1189,7 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB throws IOException { DatanodeDiskBalancerOpRequestProto.Builder requestBuilder = DatanodeDiskBalancerOpRequestProto.newBuilder() - .setOpType(DatanodeDiskBalancerOpType.stop); + .setOpType(HddsProtos.DiskBalancerOpType.STOP); hosts.ifPresent(requestBuilder::addAllHosts); DatanodeDiskBalancerOpResponseProto response = @@ -1219,7 +1218,7 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB DatanodeDiskBalancerOpRequestProto.Builder requestBuilder = DatanodeDiskBalancerOpRequestProto.newBuilder() - .setOpType(DatanodeDiskBalancerOpType.update) + .setOpType(HddsProtos.DiskBalancerOpType.UPDATE) .setConf(confBuilder); hosts.ifPresent(requestBuilder::addAllHosts); diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto index fca2232921..7e660af307 100644 --- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto +++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto @@ -382,14 +382,8 @@ message DatanodeDiskBalancerInfoResponseProto { repeated DatanodeDiskBalancerInfoProto info = 1; } -enum DatanodeDiskBalancerOpType{ - start = 1; - stop = 2; - update = 3; -} - message DatanodeDiskBalancerOpRequestProto { - required DatanodeDiskBalancerOpType opType = 1; + required DiskBalancerOpType opType = 1; repeated string hosts = 2; optional DiskBalancerConfigurationProto conf = 3; } diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index a53d035109..430a1a7b0e 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -534,6 +534,12 @@ enum DiskBalancerRunningStatus { UNKNOWN = 3; } +enum DiskBalancerOpType{ + START = 1; + STOP = 2; + UPDATE = 3; +} + message DatanodeDiskBalancerInfoProto { required DatanodeDetailsProto node = 1; required double currentVolumeDensitySum = 2; diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto index 7a265dd0ba..f2f7d537a2 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto @@ -452,7 +452,7 @@ message DatanodeDetailsAndReplicaIndexProto { This command asks the datanode to update diskBalancer status */ message DiskBalancerCommandProto { - required bool shouldRun = 1; + required DiskBalancerOpType opType = 1; optional DiskBalancerConfigurationProto diskBalancerConf = 2; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java index c6114c52f5..1af9bfde4c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java @@ -24,10 +24,15 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.scm.DatanodeAdminError; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.hadoop.ozone.protocol.commands.DiskBalancerCommand; +import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +131,118 @@ public class DiskBalancerManager { } } + /** + * Send startDiskBalancer command to datanodes. + * If hosts is not specified, send commands to all healthy datanodes. + * @param threshold new configuration of threshold + * @param bandwidthInMB new configuration of bandwidthInMB + * @param parallelThread new configuration of parallelThread + * @param hosts Datanodes that command will apply on + * @return Possible errors + * @throws IOException + */ + public List<DatanodeAdminError> startDiskBalancer( + Optional<Double> threshold, Optional<Long> bandwidthInMB, + Optional<Integer> parallelThread, Optional<List<String>> hosts) + throws IOException { + List<DatanodeDetails> dns; + if (hosts.isPresent()) { + dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(), + useHostnames); + } else { + dns = nodeManager.getNodes(NodeStatus.inServiceHealthy()); + } + + List<DatanodeAdminError> errors = new ArrayList<>(); + for (DatanodeDetails dn : dns) { + try { + if (nodeManager.getNodeStatus(dn).isHealthy()) { + errors.add(new DatanodeAdminError(dn.getHostName(), + "Datanode not in healthy state")); + continue; + } + // If command doesn't have configuration change, then we reuse the + // latest configuration reported from Datnaodes + DiskBalancerConfiguration updateConf = attachDiskBalancerConf(dn, + threshold, bandwidthInMB, parallelThread); + DiskBalancerCommand command = new DiskBalancerCommand( + HddsProtos.DiskBalancerOpType.START, updateConf); + sendCommand(dn, command); + } catch (Exception e) { + errors.add(new DatanodeAdminError(dn.getHostName(), e.getMessage())); + } + } + return errors; + } + + /** + * Send stopDiskBalancer command to datanodes + * If hosts is not specified, send commands to all datanodes. + * @param hosts Datanodes that command will apply on + * */ + public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts) + throws IOException { + List<DatanodeDetails> dns; + if (hosts.isPresent()) { + dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(), + useHostnames); + } else { + dns = nodeManager.getNodes(NodeStatus.inServiceHealthy()); + } + + List<DatanodeAdminError> errors = new ArrayList<>(); + for (DatanodeDetails dn : dns) { + try { + DiskBalancerCommand command = new DiskBalancerCommand( + HddsProtos.DiskBalancerOpType.STOP, null); + sendCommand(dn, command); + } catch (Exception e) { + errors.add(new DatanodeAdminError(dn.getHostName(), e.getMessage())); + } + } + return errors; + } + + + /** + * Send update DiskBalancerConf command to datanodes. + * If hosts is not specified, send commands to all healthy datanodes. + * @param threshold new configuration of threshold + * @param bandwidthInMB new configuration of bandwidthInMB + * @param parallelThread new configuration of parallelThread + * @param hosts Datanodes that command will apply on + * @return Possible errors + * @throws IOException + */ + public List<DatanodeAdminError> updateDiskBalancerConfiguration( + Optional<Double> threshold, Optional<Long> bandwidthInMB, + Optional<Integer> parallelThread, Optional<List<String>> hosts) + throws IOException { + List<DatanodeDetails> dns; + if (hosts.isPresent()) { + dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(), + useHostnames); + } else { + dns = nodeManager.getNodes(NodeStatus.inServiceHealthy()); + } + + List<DatanodeAdminError> errors = new ArrayList<>(); + for (DatanodeDetails dn : dns) { + try { + // If command doesn't have configuration change, then we reuse the + // latest configuration reported from Datnaodes + DiskBalancerConfiguration updateConf = attachDiskBalancerConf(dn, + threshold, bandwidthInMB, parallelThread); + DiskBalancerCommand command = new DiskBalancerCommand( + HddsProtos.DiskBalancerOpType.UPDATE, updateConf); + sendCommand(dn, command); + } catch (Exception e) { + errors.add(new DatanodeAdminError(dn.getHostName(), e.getMessage())); + } + } + return errors; + } + private boolean shouldReturnDatanode( HddsProtos.DiskBalancerRunningStatus status, DatanodeDetails datanodeDetails) { @@ -218,6 +335,30 @@ public class DiskBalancerManager { } } + private DiskBalancerConfiguration attachDiskBalancerConf( + DatanodeDetails dn, Optional<Double> threshold, + Optional<Long> bandwidthInMB, Optional<Integer> parallelThread) { + DiskBalancerConfiguration baseConf = statusMap.containsKey(dn) ? + statusMap.get(dn).getDiskBalancerConfiguration() : + new DiskBalancerConfiguration(); + threshold.ifPresent(baseConf::setThreshold); + bandwidthInMB.ifPresent(baseConf::setDiskBandwidthInMB); + parallelThread.ifPresent(baseConf::setParallelThread); + return baseConf; + } + + private void sendCommand(DatanodeDetails dn, DiskBalancerCommand command) { + try { + command.setTerm(scmContext.getTermOfLeader()); + } catch (NotLeaderException nle) { + LOG.warn("Skip sending DiskBalancerCommand for Datanode {}," + + " since not leader SCM.", dn.getUuidString()); + return; + } + scmNodeEventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, + new CommandForDatanode<>(dn.getUuid(), command)); + } + @VisibleForTesting public Map<DatanodeDetails, DiskBalancerStatus> getStatusMap() { return statusMap; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index 60b17d1fcb..ec7b486795 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -1347,21 +1347,21 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB throws IOException { List<DatanodeAdminError> errors; switch (request.getOpType()) { - case start: + case START: errors = impl.startDiskBalancer( Optional.of(request.getConf().getThreshold()), Optional.of(request.getConf().getDiskBandwidthInMB()), Optional.of(request.getConf().getParallelThread()), Optional.of(request.getHostsList())); break; - case update: + case UPDATE: errors = impl.updateDiskBalancerConfiguration( Optional.of(request.getConf().getThreshold()), Optional.of(request.getConf().getDiskBandwidthInMB()), Optional.of(request.getConf().getParallelThread()), Optional.of(request.getHostsList())); break; - case stop: + case STOP: errors = impl.stopDiskBalancer(Optional.of(request.getHostsList())); break; default: diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 1704c61515..c5ab07154a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -1274,6 +1274,7 @@ public class SCMClientProtocolServer implements ContainerID.valueOf(startContainerID), count, state); } + @Override public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport( int count, int clientVersion) throws IOException { // check admin authorisation @@ -1309,15 +1310,27 @@ public class SCMClientProtocolServer implements public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold, Optional<Long> bandwidthInMB, Optional<Integer> parallelThread, Optional<List<String>> hosts) throws IOException { - // TODO: Send message to datanodes - return new ArrayList<DatanodeAdminError>(); + try { + getScm().checkAdminAccess(getRemoteUser(), false); + } catch (IOException e) { + LOG.error("Authorization failed", e); + throw e; + } + + return scm.getDiskBalancerManager() + .startDiskBalancer(threshold, bandwidthInMB, parallelThread, hosts); } @Override public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts) throws IOException { - // TODO: Send message to datanodes - return new ArrayList<DatanodeAdminError>(); + try { + getScm().checkAdminAccess(getRemoteUser(), false); + } catch (IOException e) { + LOG.error("Authorization failed", e); + throw e; + } + return scm.getDiskBalancerManager().stopDiskBalancer(hosts); } @@ -1326,8 +1339,15 @@ public class SCMClientProtocolServer implements Optional<Double> threshold, Optional<Long> bandwidthInMB, Optional<Integer> parallelThread, Optional<List<String>> hosts) throws IOException { - // TODO: Send message to datanodes - return new ArrayList<DatanodeAdminError>(); + try { + getScm().checkAdminAccess(getRemoteUser(), false); + } catch (IOException e) { + LOG.error("Authorization failed", e); + throw e; + } + + return scm.getDiskBalancerManager().updateDiskBalancerConfiguration( + threshold, bandwidthInMB, parallelThread, hosts); } /** diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java index f58c7a1f84..fc87b145a7 100644 --- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java @@ -86,4 +86,4 @@ public class DiskBalancerStartSubcommand extends ScmSubcommand { public void setAllHosts(boolean allHosts) { this.commonOptions.setAllHosts(allHosts); } -} \ No newline at end of file +} diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java index 28ed4fd325..744d007539 100644 --- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java @@ -84,4 +84,4 @@ public class DiskBalancerUpdateSubcommand extends ScmSubcommand { public void setAllHosts(boolean allHosts) { this.commonOptions.setAllHosts(allHosts); } -} \ No newline at end of file +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
