This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-5713 by this push:
new 1d71713712 HDDS-7383. Basic framework of DiskBalancerService (#3874)
1d71713712 is described below
commit 1d71713712a89346e2e2f6d24715a5181bedabae
Author: Symious <[email protected]>
AuthorDate: Thu Feb 23 17:10:58 2023 +0800
HDDS-7383. Basic framework of DiskBalancerService (#3874)
---
.../scm/storage/DiskBalancerConfiguration.java | 85 ++++-
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 3 +
.../hadoop/hdds/fs/MockSpaceUsageCheckFactory.java | 16 +
.../commandhandler/DiskBalancerCommandHandler.java | 39 +-
.../container/diskbalancer/DiskBalancerInfo.java | 130 +++++++
.../diskbalancer/DiskBalancerService.java | 397 +++++++++++++++++++++
.../diskbalancer/DiskBalancerServiceMetrics.java | 138 +++++++
.../diskbalancer/DiskBalancerVersion.java | 79 ++++
.../container/diskbalancer/DiskBalancerYaml.java | 165 +++++++++
.../ozone/container/diskbalancer/package-info.java | 22 ++
.../ozone/container/ozoneimpl/OzoneContainer.java | 26 +-
.../protocol/commands/DiskBalancerCommand.java | 25 +-
.../common/report/TestReportPublisher.java | 4 +-
.../diskbalancer/DiskBalancerServiceTestImpl.java | 104 ++++++
.../diskbalancer/TestDiskBalancerService.java | 179 ++++++++++
.../diskbalancer/TestDiskBalancerYaml.java | 53 +++
...inerLocationProtocolClientSideTranslatorPB.java | 7 +-
.../src/main/proto/ScmAdminProtocol.proto | 8 +-
.../interface-client/src/main/proto/hdds.proto | 6 +
.../proto/ScmServerDatanodeHeartbeatProtocol.proto | 2 +-
.../hadoop/hdds/scm/node/DiskBalancerManager.java | 141 ++++++++
...inerLocationProtocolServerSideTranslatorPB.java | 6 +-
.../hdds/scm/server/SCMClientProtocolServer.java | 32 +-
23 files changed, 1630 insertions(+), 37 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
index 43417f2e81..c41a49db1f 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
@@ -28,6 +28,11 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
+import java.util.Optional;
+
+import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE;
+
/**
* This class contains configuration values for the DiskBalancer.
*/
@@ -36,6 +41,17 @@ public final class DiskBalancerConfiguration {
private static final Logger LOG =
LoggerFactory.getLogger(DiskBalancerConfiguration.class);
+ @Config(key = "info.dir", type = ConfigType.STRING,
+ defaultValue = "", tags = {ConfigTag.DISKBALANCER},
+ description = "The path where datanode diskBalancer's conf is to be " +
+ "written to. if this property is not defined, ozone will fall " +
+ "back to use metadata directory instead.")
+ private String infoDir;
+
+ public String getDiskBalancerInfoDir() {
+ return infoDir;
+ }
+
@Config(key = "volume.density.threshold", type = ConfigType.DOUBLE,
defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
description = "Threshold is a percentage in the range of 0 to 100. A " +
@@ -55,6 +71,72 @@ public final class DiskBalancerConfiguration {
description = "The max parallel balance thread count.")
private int parallelThread = 5;
+ @Config(key = "should.run.default",
+ defaultValue = "false",
+ type = ConfigType.BOOLEAN,
+ tags = { DATANODE, ConfigTag.DISKBALANCER},
+ description =
+ "If DiskBalancer fails to get information from diskbalancer.info, " +
+ "it will choose this value to decide if this service should be "
+
+ "running."
+ )
+ private boolean diskBalancerShouldRun = false;
+
+ public boolean getDiskBalancerShouldRun() {
+ return diskBalancerShouldRun;
+ }
+
+ public void setDiskBalancerShouldRun(boolean shouldRun) {
+ this.diskBalancerShouldRun = shouldRun;
+ }
+
+ @Config(key = "service.interval",
+ defaultValue = "60s",
+ type = ConfigType.TIME,
+ tags = { DATANODE, ConfigTag.DISKBALANCER},
+ description = "Time interval of the Datanode DiskBalancer service. " +
+ "The Datanode will check the service periodically and update " +
+ "the config and running status for DiskBalancer service. " +
+ "Unit could be defined with postfix (ns,ms,s,m,h,d). "
+ )
+ private long diskBalancerInterval = Duration.ofSeconds(60).toMillis();
+
+ public Duration getDiskBalancerInterval() {
+ return Duration.ofMillis(diskBalancerInterval);
+ }
+
+ public void setDiskBalancerInterval(Duration duration) {
+ this.diskBalancerInterval = duration.toMillis();
+ }
+
+ @Config(key = "service.timeout",
+ defaultValue = "300s",
+ type = ConfigType.TIME,
+ tags = { DATANODE, ConfigTag.DISKBALANCER},
+ description = "Timeout for the Datanode DiskBalancer service. "
+ + "Unit could be defined with postfix (ns,ms,s,m,h,d). "
+ )
+ private long diskBalancerTimeout = Duration.ofSeconds(300).toMillis();
+
+ public Duration getDiskBalancerTimeout() {
+ return Duration.ofMillis(diskBalancerTimeout);
+ }
+
+ public void setDiskBalancerTimeout(Duration duration) {
+ this.diskBalancerTimeout = duration.toMillis();
+ }
+
+ public DiskBalancerConfiguration() {
+ }
+
+ public DiskBalancerConfiguration(Optional<Double> threshold,
+ Optional<Long> bandwidthInMB,
+ Optional<Integer> parallelThread) {
+ threshold.ifPresent(aDouble -> this.threshold = aDouble);
+ bandwidthInMB.ifPresent(aLong -> this.diskBandwidthInMB = aLong);
+ parallelThread.ifPresent(integer -> this.parallelThread = integer);
+ }
+
/**
* Gets the threshold value for DiskBalancer.
*
@@ -86,7 +168,8 @@ public final class DiskBalancerConfiguration {
*
* @return max disk bandwidth per second
*/
- public double getDiskBandwidthInMB() {
+
+ public long getDiskBandwidthInMB() {
return diskBandwidthInMB;
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 7f4f1e7b53..52ba476fdc 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -243,6 +243,9 @@ public final class OzoneConsts {
*/
public static final String OZONE_SCM_DATANODE_ID_FILE_DEFAULT =
"datanode.id";
+ public static final String
+ OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT = "diskBalancer.info";
+
// The ServiceListJSONServlet context attribute where OzoneManager
// instance gets stored.
public static final String OM_CONTEXT_ATTRIBUTE = "ozone.om";
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/MockSpaceUsageCheckFactory.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/MockSpaceUsageCheckFactory.java
index 3b2e6415c2..eaeb3c1b4d 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/MockSpaceUsageCheckFactory.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/MockSpaceUsageCheckFactory.java
@@ -50,6 +50,22 @@ public final class MockSpaceUsageCheckFactory {
}
}
+ /**
+ * An implementation that never checks space usage but reports basically
+ * 512G free space. Neither does it persist space usage info.
+ */
+ public static class HalfTera implements SpaceUsageCheckFactory {
+ @Override
+ public SpaceUsageCheckParams paramsFor(File dir) {
+ return new SpaceUsageCheckParams(dir,
+ MockSpaceUsageSource.fixed(512L * 1024 * 1024 * 1024,
+ 512L * 1024 * 1024 * 1024),
+ Duration.ZERO,
+ SpaceUsagePersistence.None.INSTANCE
+ );
+ }
+ }
+
private MockSpaceUsageCheckFactory() {
throw new UnsupportedOperationException("no instances");
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
index 705c0af0e1..d379ef2b69 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
@@ -16,15 +16,21 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
import
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.DiskBalancerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -56,7 +62,38 @@ public class DiskBalancerCommandHandler implements
CommandHandler {
public void handle(SCMCommand command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
invocationCount.incrementAndGet();
- // TODO: Do start/stop/update operation
+ final long startTime = Time.monotonicNow();
+ DiskBalancerCommand diskBalancerCommand = (DiskBalancerCommand) command;
+
+ final HddsProtos.DiskBalancerOpType opType =
+ diskBalancerCommand.getOpType();
+ final DiskBalancerConfiguration diskBalancerConf =
+ diskBalancerCommand.getDiskBalancerConfiguration();
+
+ DiskBalancerInfo diskBalancerInfo = ozoneContainer.getDiskBalancerInfo();
+
+ try {
+ switch (opType) {
+ case START:
+ diskBalancerInfo.setShouldRun(true);
+ diskBalancerInfo.updateFromConf(diskBalancerConf);
+ break;
+ case STOP:
+ diskBalancerInfo.setShouldRun(false);
+ break;
+ case UPDATE:
+ diskBalancerInfo.updateFromConf(diskBalancerConf);
+ break;
+ default:
+ throw new IOException("Unexpected type " + opType);
+ }
+ ozoneContainer.getDiskBalancerService().refresh(diskBalancerInfo);
+ } catch (IOException e) {
+ LOG.error("Can't handle command type: {}", opType, e);
+ } finally {
+ long endTime = Time.monotonicNow();
+ totalTime += endTime - startTime;
+ }
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
new file mode 100644
index 0000000000..873c89aad1
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+
+import java.util.Objects;
+
+/**
+ * DiskBalancer's information to persist.
+ */
+public class DiskBalancerInfo {
+ private boolean shouldRun;
+ private double threshold;
+ private long bandwidthInMB;
+ private int parallelThread;
+ private DiskBalancerVersion version;
+
+ public DiskBalancerInfo(boolean shouldRun, double threshold,
+ long bandwidthInMB, int parallelThread) {
+ this(shouldRun, threshold, bandwidthInMB, parallelThread,
+ DiskBalancerVersion.DEFAULT_VERSION);
+ }
+
+ public DiskBalancerInfo(boolean shouldRun, double threshold,
+ long bandwidthInMB, int parallelThread, DiskBalancerVersion version) {
+ this.shouldRun = shouldRun;
+ this.threshold = threshold;
+ this.bandwidthInMB = bandwidthInMB;
+ this.parallelThread = parallelThread;
+ this.version = version;
+ }
+
+ public DiskBalancerInfo(boolean shouldRun,
+ DiskBalancerConfiguration diskBalancerConf) {
+ this.shouldRun = shouldRun;
+ this.threshold = diskBalancerConf.getThreshold();
+ this.bandwidthInMB = diskBalancerConf.getDiskBandwidthInMB();
+ this.parallelThread = diskBalancerConf.getParallelThread();
+ this.version = DiskBalancerVersion.DEFAULT_VERSION;
+ }
+
+ public void updateFromConf(DiskBalancerConfiguration diskBalancerConf) {
+ if (threshold != diskBalancerConf.getThreshold()) {
+ setThreshold(diskBalancerConf.getThreshold());
+ }
+ if (bandwidthInMB != diskBalancerConf.getDiskBandwidthInMB()) {
+ setBandwidthInMB(diskBalancerConf.getDiskBandwidthInMB());
+ }
+ if (parallelThread != diskBalancerConf.getParallelThread()) {
+ setParallelThread(diskBalancerConf.getParallelThread());
+ }
+ }
+
+ public boolean isShouldRun() {
+ return shouldRun;
+ }
+
+ public void setShouldRun(boolean shouldRun) {
+ this.shouldRun = shouldRun;
+ }
+
+ public double getThreshold() {
+ return threshold;
+ }
+
+ public void setThreshold(double threshold) {
+ this.threshold = threshold;
+ }
+
+ public long getBandwidthInMB() {
+ return bandwidthInMB;
+ }
+
+ public void setBandwidthInMB(long bandwidthInMB) {
+ this.bandwidthInMB = bandwidthInMB;
+ }
+
+ public int getParallelThread() {
+ return parallelThread;
+ }
+
+ public void setParallelThread(int parallelThread) {
+ this.parallelThread = parallelThread;
+ }
+
+ public DiskBalancerVersion getVersion() {
+ return version;
+ };
+
+ public void setVersion(DiskBalancerVersion version) {
+ this.version = version;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DiskBalancerInfo that = (DiskBalancerInfo) o;
+ return shouldRun == that.shouldRun &&
+ Double.compare(that.threshold, threshold) == 0 &&
+ bandwidthInMB == that.bandwidthInMB &&
+ parallelThread == that.parallelThread &&
+ version == that.version;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(shouldRun, threshold, bandwidthInMB, parallelThread,
+ version);
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
new file mode 100644
index 0000000000..75e1660029
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A per-datanode disk balancing service takes in charge
+ * of moving contains among disks.
+ */
+public class DiskBalancerService extends BackgroundService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DiskBalancerService.class);
+
+ public static final String DISK_BALANCER_DIR = "diskBalancer";
+
+ private static final String DISK_BALANCER_TMP_DIR = "tmp";
+
+ private OzoneContainer ozoneContainer;
+ private final ConfigurationSource conf;
+
+ private boolean shouldRun = false;
+ private double threshold;
+ private long bandwidthInMB;
+ private int parallelThread;
+
+ private DiskBalancerVersion version;
+
+ private AtomicLong totalBalancedBytes = new AtomicLong(0L);
+ private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L);
+ private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
+
+ private Map<DiskBalancerTask, Integer> inProgressTasks;
+ private Set<Long> inProgressContainers;
+
+ // Every time a container is decided to be moved from Vol A to Vol B,
+ // the size will be deducted from Vol A and added to Vol B.
+ // This map is used to help calculate the expected storage size after
+ // the container balancing finished.
+ private Map<HddsVolume, Long> deltaSizes;
+ private MutableVolumeSet volumeSet;
+
+ private final File diskBalancerInfoFile;
+
+ private DiskBalancerServiceMetrics metrics;
+
+ public DiskBalancerService(OzoneContainer ozoneContainer,
+ long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit,
+ int workerSize, ConfigurationSource conf) throws IOException {
+ super("DiskBalancerService", serviceCheckInterval, timeUnit, workerSize,
+ serviceCheckTimeout);
+ this.ozoneContainer = ozoneContainer;
+ this.conf = conf;
+
+ String diskBalancerInfoPath = getDiskBalancerInfoPath();
+ Preconditions.checkNotNull(diskBalancerInfoPath);
+ diskBalancerInfoFile = new File(diskBalancerInfoPath);
+
+ inProgressTasks = new ConcurrentHashMap<>();
+ inProgressContainers = ConcurrentHashMap.newKeySet();
+ deltaSizes = new ConcurrentHashMap<>();
+ volumeSet = ozoneContainer.getVolumeSet();
+
+ metrics = DiskBalancerServiceMetrics.create();
+
+ loadDiskBalancerInfo();
+
+ constructTmpDir();
+ }
+
+ /**
+ * Update DiskBalancerService based on new DiskBalancerInfo.
+ * @param diskBalancerInfo
+ * @throws IOException
+ */
+ public void refresh(DiskBalancerInfo diskBalancerInfo) throws IOException {
+ applyDiskBalancerInfo(diskBalancerInfo);
+ }
+
+ private void constructTmpDir() throws IOException {
+ for (HddsVolume volume:
+ StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) {
+ Path tmpDir = getDiskBalancerTmpDir(volume);
+ try {
+ FileUtils.deleteFully(tmpDir);
+ FileUtils.createDirectories(tmpDir);
+ } catch (IOException ex) {
+ LOG.warn("Can not reconstruct tmp directory under volume {}", volume,
+ ex);
+ throw ex;
+ }
+ }
+ }
+
+ /**
+ * If the diskBalancer.info file exists, load the file. If not exists,
+ * return the default config.
+ * @throws IOException
+ */
+ private void loadDiskBalancerInfo() throws IOException {
+ DiskBalancerInfo diskBalancerInfo;
+ try {
+ if (diskBalancerInfoFile.exists()) {
+ diskBalancerInfo = readDiskBalancerInfoFile(diskBalancerInfoFile);
+ } else {
+ boolean shouldRunDefault =
+ conf.getObject(DiskBalancerConfiguration.class)
+ .getDiskBalancerShouldRun();
+ diskBalancerInfo = new DiskBalancerInfo(shouldRunDefault,
+ new DiskBalancerConfiguration());
+ }
+ } catch (IOException e) {
+ LOG.warn("Can not load diskBalancerInfo from diskBalancer.info file. " +
+ "Falling back to default configs", e);
+ throw e;
+ }
+
+ applyDiskBalancerInfo(diskBalancerInfo);
+ }
+
+ private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo)
+ throws IOException {
+ // First store in local file, then update in memory variables
+ writeDiskBalancerInfoTo(diskBalancerInfo, diskBalancerInfoFile);
+
+ setShouldRun(diskBalancerInfo.isShouldRun());
+ setThreshold(diskBalancerInfo.getThreshold());
+ setBandwidthInMB(diskBalancerInfo.getBandwidthInMB());
+ setParallelThread(diskBalancerInfo.getParallelThread());
+ setVersion(diskBalancerInfo.getVersion());
+
+ // Default executorService is ScheduledThreadPoolExecutor, so we can
+ // update the poll size by setting corePoolSize.
+ if ((getExecutorService() instanceof ScheduledThreadPoolExecutor)) {
+ ((ScheduledThreadPoolExecutor) getExecutorService())
+ .setCorePoolSize(parallelThread);
+ }
+ }
+
+ private String getDiskBalancerInfoPath() {
+ String diskBalancerInfoDir =
+ conf.getObject(DiskBalancerConfiguration.class)
+ .getDiskBalancerInfoDir();
+ if (Strings.isNullOrEmpty(diskBalancerInfoDir)) {
+ File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
+ if (metaDirPath == null) {
+ // this means meta data is not found, in theory should not happen at
+ // this point because should've failed earlier.
+ throw new IllegalArgumentException("Unable to locate meta data" +
+ "directory when getting datanode disk balancer file path");
+ }
+ diskBalancerInfoDir = metaDirPath.toString();
+ }
+ // Use default datanode disk balancer file name for file path
+ return new File(diskBalancerInfoDir,
+ OzoneConsts.OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT)
+ .toString();
+ }
+
+ /**
+ * Read {@link DiskBalancerInfo} from a local info file.
+ *
+ * @param path DiskBalancerInfo file local path
+ * @return {@link DatanodeDetails}
+ * @throws IOException If the conf file is malformed or other I/O exceptions
+ */
+ private synchronized DiskBalancerInfo readDiskBalancerInfoFile(
+ File path) throws IOException {
+ if (!path.exists()) {
+ throw new IOException("DiskBalancerConf file not found.");
+ }
+ try {
+ return DiskBalancerYaml.readDiskBalancerInfoFile(path);
+ } catch (IOException e) {
+ LOG.warn("Error loading DiskBalancerInfo yaml from {}",
+ path.getAbsolutePath(), e);
+ throw new IOException("Failed to parse DiskBalancerInfo from "
+ + path.getAbsolutePath(), e);
+ }
+ }
+
+ /**
+ * Persistent a {@link DiskBalancerInfo} to a local file.
+ *
+ * @throws IOException when read/write error occurs
+ */
+ private synchronized void writeDiskBalancerInfoTo(
+ DiskBalancerInfo diskBalancerInfo, File path)
+ throws IOException {
+ if (path.exists()) {
+ if (!path.delete() || !path.createNewFile()) {
+ throw new IOException("Unable to overwrite the DiskBalancerInfo
file.");
+ }
+ } else {
+ if (!path.getParentFile().exists() &&
+ !path.getParentFile().mkdirs()) {
+ throw new IOException("Unable to create DiskBalancerInfo
directories.");
+ }
+ }
+ DiskBalancerYaml.createDiskBalancerInfoFile(diskBalancerInfo, path);
+ }
+
+
+
+ public void setShouldRun(boolean shouldRun) {
+ this.shouldRun = shouldRun;
+ }
+
+ public void setThreshold(double threshold) {
+ this.threshold = threshold;
+ }
+
+ public void setBandwidthInMB(long bandwidthInMB) {
+ this.bandwidthInMB = bandwidthInMB;
+ }
+
+ public void setParallelThread(int parallelThread) {
+ this.parallelThread = parallelThread;
+ }
+
+ public void setVersion(DiskBalancerVersion version) {
+ this.version = version;
+ }
+
+ public DiskBalancerInfo getDiskBalancerInfo() {
+ return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB,
+ parallelThread, version);
+ }
+
+ public DiskBalancerReportProto getDiskBalancerReportProto() {
+ DiskBalancerReportProto.Builder builder =
+ DiskBalancerReportProto.newBuilder();
+ return builder.setIsRunning(shouldRun)
+ .setBalancedBytes(totalBalancedBytes.get())
+ .setDiskBalancerConf(
+ HddsProtos.DiskBalancerConfigurationProto.newBuilder()
+ .setThreshold(threshold)
+ .setDiskBandwidthInMB(bandwidthInMB)
+ .setParallelThread(parallelThread)
+ .build())
+ .build();
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+
+ if (!shouldRun) {
+ return queue;
+ }
+ metrics.incrRunningLoopCount();
+
+ if (shouldDelay()) {
+ metrics.incrIdleLoopExceedsBandwidthCount();
+ return queue;
+ }
+
+ int availableTaskCount = parallelThread - inProgressTasks.size();
+ if (availableTaskCount <= 0) {
+ LOG.info("No available thread for disk balancer service. " +
+ "Current thread count is {}.", parallelThread);
+ return queue;
+ }
+
+ // TODO: Implementation for choose tasks
+
+ if (queue.isEmpty()) {
+ metrics.incrIdleLoopNoAvailableVolumePairCount();
+ }
+ return queue;
+ }
+
+ private boolean shouldDelay() {
+ // We should wait for next AvailableTime.
+ if (Time.monotonicNow() <= nextAvailableTime.get()) {
+ return true;
+ }
+ // Calculate the next AvailableTime based on bandwidth
+ long bytesBalanced = balancedBytesInLastWindow.getAndSet(0L);
+
+ final int megaByte = 1024 * 1024;
+
+ // converting disk bandwidth in byte/millisec
+ float bytesPerMillisec = bandwidthInMB * megaByte / 1000f;
+ nextAvailableTime.set(Time.monotonicNow() +
+ ((long) (bytesBalanced / bytesPerMillisec)));
+ return false;
+ }
+
+ private class DiskBalancerTask implements BackgroundTask {
+
+ private HddsVolume sourceVolume;
+ private HddsVolume destVolume;
+ private ContainerData containerData;
+
+ DiskBalancerTask(ContainerData containerData,
+ HddsVolume sourceVolume, HddsVolume destVolume) {
+ this.containerData = containerData;
+ this.sourceVolume = sourceVolume;
+ this.destVolume = destVolume;
+ }
+
+ @Override
+ public BackgroundTaskResult call() {
+ // TODO: Details of handling tasks
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+
+ @Override
+ public int getPriority() {
+ return BackgroundTask.super.getPriority();
+ }
+
+ private void postCall() {
+ inProgressContainers.remove(containerData.getContainerID());
+ deltaSizes.put(sourceVolume, deltaSizes.get(sourceVolume) +
+ containerData.getBytesUsed());
+ deltaSizes.put(destVolume, deltaSizes.get(destVolume)
+ - containerData.getBytesUsed());
+ }
+ }
+
+ private Path getDiskBalancerTmpDir(HddsVolume hddsVolume) {
+ return Paths.get(hddsVolume.getVolumeRootDir())
+ .resolve(DISK_BALANCER_TMP_DIR).resolve(DISK_BALANCER_DIR);
+ }
+
+ public boolean isBalancingContainer(long containerId) {
+ return inProgressContainers.contains(containerId);
+ }
+
+ public DiskBalancerServiceMetrics getMetrics() {
+ return metrics;
+ }
+
+ @VisibleForTesting
+ public void setBalancedBytesInLastWindow(long bytes) {
+ this.balancedBytesInLastWindow.set(bytes);
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ if (metrics != null) {
+ DiskBalancerServiceMetrics.unRegister();
+ }
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java
new file mode 100644
index 0000000000..3cf087b334
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * Metrics related to DiskBalancer Service running on Datanode.
+ */
+@Metrics(name = "DiskBalancerService Metrics", about = "Metrics related to "
+ + "background disk balancer service on Datanode", context = "dfs")
+public final class DiskBalancerServiceMetrics {
+
+ private static DiskBalancerServiceMetrics instance;
+ public static final String SOURCE_NAME =
+ DiskBalancerServiceMetrics.class.getSimpleName();
+
+ @Metric(about = "The number of successful balance job")
+ private MutableCounterLong successCount;
+
+ @Metric(about = "The total bytes for successfully balanced job.")
+ private MutableCounterLong successBytes;
+
+ @Metric(about = "The number of failed balance job.")
+ private MutableCounterLong failureCount;
+
+ @Metric(about = "The number of total running loop")
+ private MutableCounterLong runningLoopCount;
+
+ @Metric(about = "The number of idle loop that can not generate volume pair.")
+ private MutableCounterLong idleLoopNoAvailableVolumePairCount;
+
+ @Metric(about = "The number of idle loop due to bandwidth limits.")
+ private MutableCounterLong idleLoopExceedsBandwidthCount;
+
+ private DiskBalancerServiceMetrics() {
+ }
+
+ public static DiskBalancerServiceMetrics create() {
+ if (instance == null) {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ instance = ms.register(SOURCE_NAME, "DiskBalancerService",
+ new DiskBalancerServiceMetrics());
+ }
+
+ return instance;
+ }
+
+ /**
+ * Unregister the metrics instance.
+ */
+ public static void unRegister() {
+ instance = null;
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ ms.unregisterSource(SOURCE_NAME);
+ }
+
+ public void incrSuccessCount(long count) {
+ this.successCount.incr(count);
+ }
+
+ public void incrSuccessBytes(long bytes) {
+ this.successBytes.incr(bytes);
+ }
+
+ public void incrFailureCount() {
+ this.failureCount.incr();
+ }
+
+ public void incrRunningLoopCount() {
+ this.runningLoopCount.incr();
+ }
+
+ public void incrIdleLoopNoAvailableVolumePairCount() {
+ this.idleLoopNoAvailableVolumePairCount.incr();
+ }
+
+ public void incrIdleLoopExceedsBandwidthCount() {
+ this.idleLoopExceedsBandwidthCount.incr();
+ }
+
+ public long getSuccessCount() {
+ return successCount.value();
+ }
+
+ public long getSuccessBytes() {
+ return successBytes.value();
+ }
+
+ public long getFailureCount() {
+ return failureCount.value();
+ }
+
+ public long getRunningLoopCount() {
+ return runningLoopCount.value();
+ }
+
+ public long getIdleLoopNoAvailableVolumePairCount() {
+ return idleLoopNoAvailableVolumePairCount.value();
+ }
+
+ public long getIdleLoopExceedsBandwidthCount() {
+ return idleLoopExceedsBandwidthCount.value();
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("successCount = " + successCount.value()).append("\t")
+ .append("successBytes = " + successBytes.value()).append("\t")
+ .append("failureCount = " + failureCount.value()).append("\t")
+ .append("idleLoopNoAvailableVolumePairCount = " +
+ idleLoopNoAvailableVolumePairCount.value()).append("\t")
+ .append("idleLoopExceedsBandwidthCount = " +
+ idleLoopExceedsBandwidthCount.value());
+ return buffer.toString();
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerVersion.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerVersion.java
new file mode 100644
index 0000000000..28da8993c1
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerVersion.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/**
+ * Defines versions for the DiskBalancerService.
+ */
+public enum DiskBalancerVersion {
+ ONE(1, "First Version") {
+ };
+
+ private final int version;
+ private final String description;
+
+ public static final DiskBalancerVersion
+ DEFAULT_VERSION = DiskBalancerVersion.ONE;
+
+ private static final List<DiskBalancerVersion> DISK_BALANCER_VERSIONS =
+ ImmutableList.copyOf(values());
+
+ DiskBalancerVersion(int version, String description) {
+ this.version = version;
+ this.description = description;
+ }
+
+ public static DiskBalancerVersion getDiskBalancerVersion(int version) {
+ for (DiskBalancerVersion diskBalancerVersion :
+ DISK_BALANCER_VERSIONS) {
+ if (diskBalancerVersion.getVersion() == version) {
+ return diskBalancerVersion;
+ }
+ }
+ return null;
+ }
+
+ public static DiskBalancerVersion getDiskBalancerVersion(String versionStr) {
+ for (DiskBalancerVersion diskBalancerVersion :
+ DISK_BALANCER_VERSIONS) {
+ if (diskBalancerVersion.toString().equalsIgnoreCase(versionStr)) {
+ return diskBalancerVersion;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * @return version number.
+ */
+ public int getVersion() {
+ return version;
+ }
+
+ /**
+ * @return description.
+ */
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java
new file mode 100644
index 0000000000..d16eb65747
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import org.yaml.snakeyaml.DumperOptions;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Class for creating diskbalancer.info file in yaml format.
+ */
+
+public final class DiskBalancerYaml {
+
+ private DiskBalancerYaml() {
+ // static helper methods only, no state.
+ }
+
+ /**
+ * Creates a yaml file to store DiskBalancer info.
+ *
+ * @param diskBalancerInfo {@link DiskBalancerInfo}
+ * @param path Path to diskBalancer.info file
+ */
+ public static void createDiskBalancerInfoFile(
+ DiskBalancerInfo diskBalancerInfo, File path)
+ throws IOException {
+ DumperOptions options = new DumperOptions();
+ options.setPrettyFlow(true);
+ options.setDefaultFlowStyle(DumperOptions.FlowStyle.FLOW);
+ Yaml yaml = new Yaml(options);
+
+ try (Writer writer = new OutputStreamWriter(
+ new FileOutputStream(path), StandardCharsets.UTF_8)) {
+ yaml.dump(getDiskBalancerInfoYaml(diskBalancerInfo), writer);
+ }
+ }
+
+ /**
+ * Read DiskBalancerConfiguration from file.
+ */
+ public static DiskBalancerInfo readDiskBalancerInfoFile(File path)
+ throws IOException {
+ DiskBalancerInfo diskBalancerInfo;
+
+ try (FileInputStream inputFileStream = new FileInputStream(path)) {
+ Yaml yaml = new Yaml();
+ DiskBalancerInfoYaml diskBalancerInfoYaml;
+ try {
+ diskBalancerInfoYaml =
+ yaml.loadAs(inputFileStream, DiskBalancerInfoYaml.class);
+ } catch (Exception e) {
+ throw new IOException("Unable to parse yaml file.", e);
+ }
+
+ diskBalancerInfo = new DiskBalancerInfo(
+ diskBalancerInfoYaml.isShouldRun(),
+ diskBalancerInfoYaml.getThreshold(),
+ diskBalancerInfoYaml.getBandwidthInMB(),
+ diskBalancerInfoYaml.getParallelThread(),
+ DiskBalancerVersion.getDiskBalancerVersion(
+ diskBalancerInfoYaml.version));
+ }
+
+ return diskBalancerInfo;
+ }
+
+ /**
+ * Datanode DiskBalancer Info to be written to the yaml file.
+ */
+ public static class DiskBalancerInfoYaml {
+ private boolean shouldRun;
+ private double threshold;
+ private long bandwidthInMB;
+ private int parallelThread;
+
+ private int version;
+
+ public DiskBalancerInfoYaml() {
+ // Needed for snake-yaml introspection.
+ }
+
+ private DiskBalancerInfoYaml(boolean shouldRun, double threshold,
+ long bandwidthInMB, int parallelThread, int version) {
+ this.shouldRun = shouldRun;
+ this.threshold = threshold;
+ this.bandwidthInMB = bandwidthInMB;
+ this.parallelThread = parallelThread;
+ this.version = version;
+ }
+
+ public boolean isShouldRun() {
+ return shouldRun;
+ }
+
+ public void setShouldRun(boolean shouldRun) {
+ this.shouldRun = shouldRun;
+ }
+
+ public void setThreshold(double threshold) {
+ this.threshold = threshold;
+ }
+
+ public double getThreshold() {
+ return threshold;
+ }
+
+ public void setBandwidthInMB(long bandwidthInMB) {
+ this.bandwidthInMB = bandwidthInMB;
+ }
+
+ public long getBandwidthInMB() {
+ return this.bandwidthInMB;
+ }
+
+ public void setParallelThread(int parallelThread) {
+ this.parallelThread = parallelThread;
+ }
+
+ public int getParallelThread() {
+ return this.parallelThread;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public int getVersion() {
+ return this.version;
+ }
+ }
+
+ private static DiskBalancerInfoYaml getDiskBalancerInfoYaml(
+ DiskBalancerInfo diskBalancerInfo) {
+
+ return new DiskBalancerInfoYaml(
+ diskBalancerInfo.isShouldRun(),
+ diskBalancerInfo.getThreshold(),
+ diskBalancerInfo.getBandwidthInMB(),
+ diskBalancerInfo.getParallelThread(),
+ diskBalancerInfo.getVersion().getVersion());
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/package-info.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/package-info.java
new file mode 100644
index 0000000000..d8ba71eb9d
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ This package contains classes related to the DiskBalancer service.
+ */
+package org.apache.hadoop.ozone.container.diskbalancer;
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 14a89bb867..27d81675ec 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -29,6 +29,7 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
@@ -54,6 +55,8 @@ import
org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import
org.apache.hadoop.ozone.container.common.volume.StorageVolume.VolumeType;
import org.apache.hadoop.ozone.container.common.volume.StorageVolumeChecker;
+import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo;
+import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService;
import
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
import
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.StaleRecoveringContainerScrubbingService;
import org.apache.hadoop.ozone.container.replication.ContainerImporter;
@@ -115,6 +118,7 @@ public class OzoneContainer {
private final StaleRecoveringContainerScrubbingService
recoveringContainerScrubbingService;
private final GrpcTlsConfig tlsClientConfig;
+ private final DiskBalancerService diskBalancerService;
private final AtomicReference<InitializingStatus> initializingStatus;
private final ReplicationServer replicationServer;
private DatanodeDetails datanodeDetails;
@@ -228,6 +232,15 @@ public class OzoneContainer {
blockDeletingServiceTimeout, TimeUnit.MILLISECONDS,
blockDeletingServiceWorkerSize, config);
+ Duration diskBalancerSvcInterval = conf.getObject(
+ DiskBalancerConfiguration.class).getDiskBalancerInterval();
+ Duration diskBalancerSvcTimeout = conf.getObject(
+ DiskBalancerConfiguration.class).getDiskBalancerTimeout();
+ diskBalancerService =
+ new DiskBalancerService(this, diskBalancerSvcInterval.toMillis(),
+ diskBalancerSvcTimeout.toMillis(), TimeUnit.MILLISECONDS, 1,
+ config);
+
Duration recoveringContainerScrubbingSvcInterval = conf.getObject(
DatanodeConfiguration.class).getRecoveringContainerScrubInterval();
@@ -407,6 +420,7 @@ public class OzoneContainer {
hddsDispatcher.init();
hddsDispatcher.setClusterId(clusterId);
blockDeletingService.start();
+ diskBalancerService.start();
recoveringContainerScrubbingService.start();
// mark OzoneContainer as INITIALIZED.
@@ -432,6 +446,7 @@ public class OzoneContainer {
dbVolumeSet.shutdown();
}
blockDeletingService.shutdown();
+ diskBalancerService.shutdown();
recoveringContainerScrubbingService.shutdown();
ContainerMetrics.remove();
}
@@ -524,7 +539,14 @@ public class OzoneContainer {
}
public DiskBalancerReportProto getDiskBalancerReport() {
- // TODO: Return real disk balancer report
- return null;
+ return diskBalancerService.getDiskBalancerReportProto();
+ }
+
+ public DiskBalancerInfo getDiskBalancerInfo() {
+ return diskBalancerService.getDiskBalancerInfo();
+ }
+
+ public DiskBalancerService getDiskBalancerService() {
+ return diskBalancerService;
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java
index ea97f34ca0..1780877787 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.protocol.commands;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
@@ -29,12 +30,12 @@ import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt
*/
public class DiskBalancerCommand extends SCMCommand<DiskBalancerCommandProto> {
- private final boolean shouldRun;
+ private final HddsProtos.DiskBalancerOpType opType;
private final DiskBalancerConfiguration diskBalancerConfiguration;
- public DiskBalancerCommand(final boolean shouldRun,
+ public DiskBalancerCommand(final HddsProtos.DiskBalancerOpType opType,
final DiskBalancerConfiguration diskBalancerConfiguration) {
- this.shouldRun = shouldRun;
+ this.opType = opType;
this.diskBalancerConfiguration = diskBalancerConfiguration;
}
@@ -50,22 +51,26 @@ public class DiskBalancerCommand extends
SCMCommand<DiskBalancerCommandProto> {
@Override
public DiskBalancerCommandProto getProto() {
- return DiskBalancerCommandProto.newBuilder()
- .setShouldRun(shouldRun)
- .setDiskBalancerConf(diskBalancerConfiguration.toProtobufBuilder())
- .build();
+ DiskBalancerCommandProto.Builder builder = DiskBalancerCommandProto
+ .newBuilder().setOpType(opType);
+ // Stop command don't have diskBalancerConf
+ if (diskBalancerConfiguration != null) {
+ builder.setDiskBalancerConf(
+ diskBalancerConfiguration.toProtobufBuilder());
+ }
+ return builder.build();
}
public static DiskBalancerCommand getFromProtobuf(DiskBalancerCommandProto
diskbalancerCommandProto, ConfigurationSource configuration) {
Preconditions.checkNotNull(diskbalancerCommandProto);
- return new DiskBalancerCommand(diskbalancerCommandProto.getShouldRun(),
+ return new DiskBalancerCommand(diskbalancerCommandProto.getOpType(),
DiskBalancerConfiguration.fromProtobuf(
diskbalancerCommandProto.getDiskBalancerConf(), configuration));
}
- public boolean isShouldRun() {
- return shouldRun;
+ public HddsProtos.DiskBalancerOpType getOpType() {
+ return opType;
}
public DiskBalancerConfiguration getDiskBalancerConfiguration() {
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index c09261d4ab..8318f14b47 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -246,12 +246,12 @@ public class TestReportPublisher {
publisher.init(dummyContext, executorService);
Message report =
((DiskBalancerReportPublisher) publisher).getReport();
- Assert.assertNotNull(report);
+ Assertions.assertNotNull(report);
for (Descriptors.FieldDescriptor descriptor :
report.getDescriptorForType().getFields()) {
if (descriptor.getNumber() ==
DiskBalancerReportProto.ISRUNNING_FIELD_NUMBER) {
- Assert.assertEquals(true, report.getField(descriptor));
+ Assertions.assertEquals(true, report.getField(descriptor));
}
}
executorService.shutdown();
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java
new file mode 100644
index 0000000000..0d87ac3420
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A test class implementation for {@link DiskBalancerService}.
+ */
+public class DiskBalancerServiceTestImpl extends DiskBalancerService {
+
+ // the service timeout
+ private static final int SERVICE_TIMEOUT_IN_MILLISECONDS = 0;
+
+ // tests only
+ private CountDownLatch latch;
+ private Thread testingThread;
+ private AtomicInteger numOfProcessed = new AtomicInteger(0);
+
+ public DiskBalancerServiceTestImpl(OzoneContainer container,
+ int serviceInterval, ConfigurationSource conf, int threadCount)
+ throws IOException {
+ super(container, serviceInterval, SERVICE_TIMEOUT_IN_MILLISECONDS,
+ TimeUnit.MILLISECONDS, threadCount, conf);
+ }
+
+ public void runBalanceTasks() {
+ if (latch.getCount() > 0) {
+ this.latch.countDown();
+ } else {
+ throw new IllegalStateException("Count already reaches zero");
+ }
+ }
+
+ public boolean isStarted() {
+ return latch != null && testingThread.isAlive();
+ }
+
+ public int getTimesOfProcessed() {
+ return numOfProcessed.get();
+ }
+
+ // Override the implementation to start a single on-call control thread.
+ @Override
+ public void start() {
+ PeriodicalTask svc = new PeriodicalTask();
+ // In test mode, relies on a latch countdown to runDeletingTasks tasks.
+ Runnable r = () -> {
+ while (true) {
+ latch = new CountDownLatch(1);
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ break;
+ }
+ Future<?> future = this.getExecutorService().submit(svc);
+ try {
+ // for tests, we only wait for 3s for completion
+ future.get(3000, TimeUnit.SECONDS);
+ numOfProcessed.incrementAndGet();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+ };
+
+ testingThread = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .build()
+ .newThread(r);
+ testingThread.start();
+ }
+
+ @Override
+ public void shutdown() {
+ testingThread.interrupt();
+ super.shutdown();
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
new file mode 100644
index 0000000000..a58f00103b
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.container.common.TestBlockDeletingService;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import static
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * This is a test class for DiskBalancerService.
+ */
+@RunWith(Parameterized.class)
+public class TestDiskBalancerService {
+ private File testRoot;
+ private String scmId;
+ private String datanodeUuid;
+ private OzoneConfiguration conf;
+
+ private final ContainerLayoutVersion layout;
+ private final String schemaVersion;
+ private MutableVolumeSet volumeSet;
+
+ public TestDiskBalancerService(ContainerTestVersionInfo versionInfo) {
+ this.layout = versionInfo.getLayout();
+ this.schemaVersion = versionInfo.getSchemaVersion();
+ conf = new OzoneConfiguration();
+ ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+ }
+
+ @Parameterized.Parameters
+ public static Iterable<Object[]> parameters() {
+ return ContainerTestVersionInfo.versionParameters();
+ }
+
+ @Before
+ public void init() throws IOException {
+ testRoot = GenericTestUtils
+ .getTestDir(TestBlockDeletingService.class.getSimpleName());
+ if (testRoot.exists()) {
+ FileUtils.cleanDirectory(testRoot);
+ }
+ scmId = UUID.randomUUID().toString();
+ conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
+ generateVolumeLocation(testRoot.getAbsolutePath(), 2));
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testRoot.getAbsolutePath());
+ conf.set("hdds.datanode.du.factory.classname",
+ "org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory$HalfTera");
+ datanodeUuid = UUID.randomUUID().toString();
+ volumeSet = new MutableVolumeSet(datanodeUuid, scmId, conf, null,
+ StorageVolume.VolumeType.DATA_VOLUME, null);
+ createDbInstancesForTestIfNeeded(volumeSet, scmId, scmId, conf);
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ BlockUtils.shutdownCache(conf);
+ FileUtils.deleteDirectory(testRoot);
+ }
+
+ @Test
+ @Timeout(30)
+ public void testUpdateService() throws Exception {
+ // Increase volume's usedBytes
+ for (StorageVolume volume : volumeSet.getVolumeMap().values()) {
+ volume.incrementUsedSpace(volume.getCapacity() / 2);
+ }
+
+ ContainerSet containerSet = new ContainerSet(1000);
+ ContainerMetrics metrics = ContainerMetrics.create(conf);
+ KeyValueHandler keyValueHandler =
+ new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
+ metrics, c -> {
+ });
+ DiskBalancerServiceTestImpl svc =
+ getDiskBalancerService(containerSet, conf, keyValueHandler, null, 1);
+
+ // Set a low bandwidth to delay job
+ svc.setShouldRun(true);
+ svc.setThreshold(10.0d);
+ svc.setBandwidthInMB(1L);
+ svc.setParallelThread(5);
+ svc.setVersion(DiskBalancerVersion.DEFAULT_VERSION);
+
+ svc.start();
+
+ Assert.assertTrue(svc.getDiskBalancerInfo().isShouldRun());
+ Assert.assertEquals(10, svc.getDiskBalancerInfo().getThreshold(), 0.0);
+ Assert.assertEquals(1, svc.getDiskBalancerInfo().getBandwidthInMB());
+ Assert.assertEquals(5, svc.getDiskBalancerInfo().getParallelThread());
+
+ DiskBalancerInfo newInfo = new DiskBalancerInfo(false, 20.0d, 5L, 10);
+ svc.refresh(newInfo);
+
+ Assert.assertFalse(svc.getDiskBalancerInfo().isShouldRun());
+ Assert.assertEquals(20, svc.getDiskBalancerInfo().getThreshold(), 0.0);
+ Assert.assertEquals(5, svc.getDiskBalancerInfo().getBandwidthInMB());
+ Assert.assertEquals(10, svc.getDiskBalancerInfo().getParallelThread());
+
+ svc.shutdown();
+ }
+
+ private String generateVolumeLocation(String base, int volumeCount) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < volumeCount; i++) {
+ sb.append(base + "/vol" + i);
+ sb.append(",");
+ }
+ return sb.substring(0, sb.length() - 1);
+ }
+
+ private DiskBalancerServiceTestImpl getDiskBalancerService(
+ ContainerSet containerSet, ConfigurationSource config,
+ KeyValueHandler keyValueHandler, ContainerController controller,
+ int threadCount) throws IOException {
+ OzoneContainer ozoneContainer =
+ mockDependencies(containerSet, keyValueHandler, controller);
+ return new DiskBalancerServiceTestImpl(ozoneContainer, 1000, config,
+ threadCount);
+ }
+
+ private OzoneContainer mockDependencies(ContainerSet containerSet,
+ KeyValueHandler keyValueHandler, ContainerController controller) {
+ OzoneContainer ozoneContainer = mock(OzoneContainer.class);
+ when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
+ when(ozoneContainer.getWriteChannel()).thenReturn(null);
+ ContainerDispatcher dispatcher = mock(ContainerDispatcher.class);
+ when(ozoneContainer.getDispatcher()).thenReturn(dispatcher);
+ when(dispatcher.getHandler(any())).thenReturn(keyValueHandler);
+ when(ozoneContainer.getVolumeSet()).thenReturn(volumeSet);
+ when(ozoneContainer.getController()).thenReturn(controller);
+ return ozoneContainer;
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java
new file mode 100644
index 0000000000..3b61255b0b
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static
org.apache.hadoop.ozone.OzoneConsts.OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT;
+
+/**
+ * Tests to test DiskBalancer's YAML operation.
+ */
+public class TestDiskBalancerYaml {
+ @Test
+ public void testCreateYaml() throws IOException {
+ boolean shouldRun = true;
+ double threshold = 10;
+ long bandwidthInMB = 100;
+ int parallelThread = 5;
+ DiskBalancerVersion version = DiskBalancerVersion.DEFAULT_VERSION;
+
+ File file = new File(GenericTestUtils.getTestDir(),
+ OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT);
+
+ DiskBalancerInfo info = new DiskBalancerInfo(shouldRun, threshold,
+ bandwidthInMB, parallelThread, version);
+
+ DiskBalancerYaml.createDiskBalancerInfoFile(info, file);
+
+ DiskBalancerInfo newInfo = DiskBalancerYaml.readDiskBalancerInfoFile(file);
+
+ Assertions.assertEquals(info, newInfo);
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 936e31d760..c06ddc31bf 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -45,7 +45,6 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoType;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto;
-import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpType;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
@@ -1085,7 +1084,7 @@ public final class
StorageContainerLocationProtocolClientSideTranslatorPB
DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
DatanodeDiskBalancerOpRequestProto.newBuilder()
- .setOpType(DatanodeDiskBalancerOpType.start)
+ .setOpType(HddsProtos.DiskBalancerOpType.START)
.setConf(confBuilder);
hosts.ifPresent(requestBuilder::addAllHosts);
@@ -1107,7 +1106,7 @@ public final class
StorageContainerLocationProtocolClientSideTranslatorPB
throws IOException {
DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
DatanodeDiskBalancerOpRequestProto.newBuilder()
- .setOpType(DatanodeDiskBalancerOpType.stop);
+ .setOpType(HddsProtos.DiskBalancerOpType.STOP);
hosts.ifPresent(requestBuilder::addAllHosts);
DatanodeDiskBalancerOpResponseProto response =
@@ -1136,7 +1135,7 @@ public final class
StorageContainerLocationProtocolClientSideTranslatorPB
DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
DatanodeDiskBalancerOpRequestProto.newBuilder()
- .setOpType(DatanodeDiskBalancerOpType.update)
+ .setOpType(HddsProtos.DiskBalancerOpType.UPDATE)
.setConf(confBuilder);
hosts.ifPresent(requestBuilder::addAllHosts);
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index d6c552c4b4..7512ce06a9 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -351,14 +351,8 @@ message DatanodeDiskBalancerInfoResponseProto {
repeated DatanodeDiskBalancerInfoProto info = 1;
}
-enum DatanodeDiskBalancerOpType{
- start = 1;
- stop = 2;
- update = 3;
-}
-
message DatanodeDiskBalancerOpRequestProto {
- required DatanodeDiskBalancerOpType opType = 1;
+ required DiskBalancerOpType opType = 1;
repeated string hosts = 2;
optional DiskBalancerConfigurationProto conf = 3;
}
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 231e105aea..5b2cc43042 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -465,6 +465,12 @@ enum DiskBalancerRunningStatus {
UNKNOWN = 3;
}
+enum DiskBalancerOpType{
+ START = 1;
+ STOP = 2;
+ UPDATE = 3;
+}
+
message DatanodeDiskBalancerInfoProto {
required DatanodeDetailsProto node = 1;
required double currentVolumeDensitySum = 2;
diff --git
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 81e9ee54d3..f7893f2d60 100644
---
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -449,7 +449,7 @@ message DatanodeDetailsAndReplicaIndexProto {
This command asks the datanode to update diskBalancer status
*/
message DiskBalancerCommandProto {
- required bool shouldRun = 1;
+ required DiskBalancerOpType opType = 1;
optional DiskBalancerConfigurationProto diskBalancerConf = 2;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
index c6114c52f5..1af9bfde4c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
@@ -24,10 +24,15 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.DatanodeAdminError;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.DiskBalancerCommand;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,6 +131,118 @@ public class DiskBalancerManager {
}
}
+ /**
+ * Send startDiskBalancer command to datanodes.
+ * If hosts is not specified, send commands to all healthy datanodes.
+ * @param threshold new configuration of threshold
+ * @param bandwidthInMB new configuration of bandwidthInMB
+ * @param parallelThread new configuration of parallelThread
+ * @param hosts Datanodes that command will apply on
+ * @return Possible errors
+ * @throws IOException
+ */
+ public List<DatanodeAdminError> startDiskBalancer(
+ Optional<Double> threshold, Optional<Long> bandwidthInMB,
+ Optional<Integer> parallelThread, Optional<List<String>> hosts)
+ throws IOException {
+ List<DatanodeDetails> dns;
+ if (hosts.isPresent()) {
+ dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+ useHostnames);
+ } else {
+ dns = nodeManager.getNodes(NodeStatus.inServiceHealthy());
+ }
+
+ List<DatanodeAdminError> errors = new ArrayList<>();
+ for (DatanodeDetails dn : dns) {
+ try {
+ if (nodeManager.getNodeStatus(dn).isHealthy()) {
+ errors.add(new DatanodeAdminError(dn.getHostName(),
+ "Datanode not in healthy state"));
+ continue;
+ }
+ // If command doesn't have configuration change, then we reuse the
+ // latest configuration reported from Datnaodes
+ DiskBalancerConfiguration updateConf = attachDiskBalancerConf(dn,
+ threshold, bandwidthInMB, parallelThread);
+ DiskBalancerCommand command = new DiskBalancerCommand(
+ HddsProtos.DiskBalancerOpType.START, updateConf);
+ sendCommand(dn, command);
+ } catch (Exception e) {
+ errors.add(new DatanodeAdminError(dn.getHostName(), e.getMessage()));
+ }
+ }
+ return errors;
+ }
+
+ /**
+ * Send stopDiskBalancer command to datanodes
+ * If hosts is not specified, send commands to all datanodes.
+ * @param hosts Datanodes that command will apply on
+ * */
+ public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>>
hosts)
+ throws IOException {
+ List<DatanodeDetails> dns;
+ if (hosts.isPresent()) {
+ dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+ useHostnames);
+ } else {
+ dns = nodeManager.getNodes(NodeStatus.inServiceHealthy());
+ }
+
+ List<DatanodeAdminError> errors = new ArrayList<>();
+ for (DatanodeDetails dn : dns) {
+ try {
+ DiskBalancerCommand command = new DiskBalancerCommand(
+ HddsProtos.DiskBalancerOpType.STOP, null);
+ sendCommand(dn, command);
+ } catch (Exception e) {
+ errors.add(new DatanodeAdminError(dn.getHostName(), e.getMessage()));
+ }
+ }
+ return errors;
+ }
+
+
+ /**
+ * Send update DiskBalancerConf command to datanodes.
+ * If hosts is not specified, send commands to all healthy datanodes.
+ * @param threshold new configuration of threshold
+ * @param bandwidthInMB new configuration of bandwidthInMB
+ * @param parallelThread new configuration of parallelThread
+ * @param hosts Datanodes that command will apply on
+ * @return Possible errors
+ * @throws IOException
+ */
+ public List<DatanodeAdminError> updateDiskBalancerConfiguration(
+ Optional<Double> threshold, Optional<Long> bandwidthInMB,
+ Optional<Integer> parallelThread, Optional<List<String>> hosts)
+ throws IOException {
+ List<DatanodeDetails> dns;
+ if (hosts.isPresent()) {
+ dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+ useHostnames);
+ } else {
+ dns = nodeManager.getNodes(NodeStatus.inServiceHealthy());
+ }
+
+ List<DatanodeAdminError> errors = new ArrayList<>();
+ for (DatanodeDetails dn : dns) {
+ try {
+ // If command doesn't have configuration change, then we reuse the
+ // latest configuration reported from Datnaodes
+ DiskBalancerConfiguration updateConf = attachDiskBalancerConf(dn,
+ threshold, bandwidthInMB, parallelThread);
+ DiskBalancerCommand command = new DiskBalancerCommand(
+ HddsProtos.DiskBalancerOpType.UPDATE, updateConf);
+ sendCommand(dn, command);
+ } catch (Exception e) {
+ errors.add(new DatanodeAdminError(dn.getHostName(), e.getMessage()));
+ }
+ }
+ return errors;
+ }
+
private boolean shouldReturnDatanode(
HddsProtos.DiskBalancerRunningStatus status,
DatanodeDetails datanodeDetails) {
@@ -218,6 +335,30 @@ public class DiskBalancerManager {
}
}
+ private DiskBalancerConfiguration attachDiskBalancerConf(
+ DatanodeDetails dn, Optional<Double> threshold,
+ Optional<Long> bandwidthInMB, Optional<Integer> parallelThread) {
+ DiskBalancerConfiguration baseConf = statusMap.containsKey(dn) ?
+ statusMap.get(dn).getDiskBalancerConfiguration() :
+ new DiskBalancerConfiguration();
+ threshold.ifPresent(baseConf::setThreshold);
+ bandwidthInMB.ifPresent(baseConf::setDiskBandwidthInMB);
+ parallelThread.ifPresent(baseConf::setParallelThread);
+ return baseConf;
+ }
+
+ private void sendCommand(DatanodeDetails dn, DiskBalancerCommand command) {
+ try {
+ command.setTerm(scmContext.getTermOfLeader());
+ } catch (NotLeaderException nle) {
+ LOG.warn("Skip sending DiskBalancerCommand for Datanode {}," +
+ " since not leader SCM.", dn.getUuidString());
+ return;
+ }
+ scmNodeEventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+ new CommandForDatanode<>(dn.getUuid(), command));
+ }
+
@VisibleForTesting
public Map<DatanodeDetails, DiskBalancerStatus> getStatusMap() {
return statusMap;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 38a2503b55..720cab09d6 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -1222,21 +1222,21 @@ public final class
StorageContainerLocationProtocolServerSideTranslatorPB
throws IOException {
List<DatanodeAdminError> errors;
switch (request.getOpType()) {
- case start:
+ case START:
errors = impl.startDiskBalancer(
Optional.of(request.getConf().getThreshold()),
Optional.of(request.getConf().getDiskBandwidthInMB()),
Optional.of(request.getConf().getParallelThread()),
Optional.of(request.getHostsList()));
break;
- case update:
+ case UPDATE:
errors = impl.updateDiskBalancerConfiguration(
Optional.of(request.getConf().getThreshold()),
Optional.of(request.getConf().getDiskBandwidthInMB()),
Optional.of(request.getConf().getParallelThread()),
Optional.of(request.getHostsList()));
break;
- case stop:
+ case STOP:
errors = impl.stopDiskBalancer(Optional.of(request.getHostsList()));
break;
default:
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index a20edaa05a..5d66bb597e 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -1126,6 +1126,7 @@ public class SCMClientProtocolServer implements
ContainerID.valueOf(startContainerID), count, state);
}
+ @Override
public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
int count, int clientVersion) throws IOException {
// check admin authorisation
@@ -1161,15 +1162,27 @@ public class SCMClientProtocolServer implements
public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
Optional<List<String>> hosts) throws IOException {
- // TODO: Send message to datanodes
- return new ArrayList<DatanodeAdminError>();
+ try {
+ getScm().checkAdminAccess(getRemoteUser());
+ } catch (IOException e) {
+ LOG.error("Authorization failed", e);
+ throw e;
+ }
+
+ return scm.getDiskBalancerManager()
+ .startDiskBalancer(threshold, bandwidthInMB, parallelThread, hosts);
}
@Override
public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>>
hosts)
throws IOException {
- // TODO: Send message to datanodes
- return new ArrayList<DatanodeAdminError>();
+ try {
+ getScm().checkAdminAccess(getRemoteUser());
+ } catch (IOException e) {
+ LOG.error("Authorization failed", e);
+ throw e;
+ }
+ return scm.getDiskBalancerManager().stopDiskBalancer(hosts);
}
@@ -1178,8 +1191,15 @@ public class SCMClientProtocolServer implements
Optional<Double> threshold, Optional<Long> bandwidthInMB,
Optional<Integer> parallelThread, Optional<List<String>> hosts)
throws IOException {
- // TODO: Send message to datanodes
- return new ArrayList<DatanodeAdminError>();
+ try {
+ getScm().checkAdminAccess(getRemoteUser());
+ } catch (IOException e) {
+ LOG.error("Authorization failed", e);
+ throw e;
+ }
+
+ return scm.getDiskBalancerManager().updateDiskBalancerConfiguration(
+ threshold, bandwidthInMB, parallelThread, hosts);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]