This is an automated email from the ASF dual-hosted git repository.

yiyang0203 pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 35775575f18e04735dcd36d2a74637fc9a6f2be5
Author: Symious <[email protected]>
AuthorDate: Thu Feb 23 17:10:58 2023 +0800

    HDDS-7383. Basic framework of DiskBalancerService (#3874)
---
 .../scm/storage/DiskBalancerConfiguration.java     |  85 ++++-
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   3 +
 .../hadoop/hdds/fs/MockSpaceUsageCheckFactory.java |  16 +
 .../commandhandler/DiskBalancerCommandHandler.java |  39 +-
 .../container/diskbalancer/DiskBalancerInfo.java   | 130 +++++++
 .../diskbalancer/DiskBalancerService.java          | 397 +++++++++++++++++++++
 .../diskbalancer/DiskBalancerServiceMetrics.java   | 138 +++++++
 .../diskbalancer/DiskBalancerVersion.java          |  79 ++++
 .../container/diskbalancer/DiskBalancerYaml.java   | 165 +++++++++
 .../ozone/container/diskbalancer/package-info.java |  22 ++
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  26 +-
 .../protocol/commands/DiskBalancerCommand.java     |  25 +-
 .../diskbalancer/DiskBalancerServiceTestImpl.java  | 104 ++++++
 .../diskbalancer/TestDiskBalancerService.java      | 173 +++++++++
 .../diskbalancer/TestDiskBalancerYaml.java         |  53 +++
 ...inerLocationProtocolClientSideTranslatorPB.java |   7 +-
 .../src/main/proto/ScmAdminProtocol.proto          |   8 +-
 .../interface-client/src/main/proto/hdds.proto     |   6 +
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |   2 +-
 .../hadoop/hdds/scm/node/DiskBalancerManager.java  | 141 ++++++++
 ...inerLocationProtocolServerSideTranslatorPB.java |   6 +-
 .../hdds/scm/server/SCMClientProtocolServer.java   |  32 +-
 .../cli/datanode/DiskBalancerStartSubcommand.java  |   2 +-
 .../cli/datanode/DiskBalancerUpdateSubcommand.java |   2 +-
 24 files changed, 1624 insertions(+), 37 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
index e460e0b6eb..ed40fa30f6 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
@@ -28,6 +28,11 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
+import java.util.Optional;
+
+import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE;
+
 /**
  * This class contains configuration values for the DiskBalancer.
  */
@@ -36,6 +41,17 @@ public final class DiskBalancerConfiguration {
   private static final Logger LOG =
       LoggerFactory.getLogger(DiskBalancerConfiguration.class);
 
+  @Config(key = "info.dir", type = ConfigType.STRING,
+      defaultValue = "", tags = {ConfigTag.DISKBALANCER},
+      description = "The path where datanode diskBalancer's conf is to be " +
+          "written to. if this property is not defined, ozone will fall " +
+          "back to use metadata directory instead.")
+  private String infoDir;
+
+  public String getDiskBalancerInfoDir() {
+    return infoDir;
+  }
+
   @Config(key = "volume.density.threshold", type = ConfigType.DOUBLE,
       defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
       description = "Threshold is a percentage in the range of 0 to 100. A " +
@@ -55,6 +71,72 @@ public final class DiskBalancerConfiguration {
       description = "The max parallel balance thread count.")
   private int parallelThread = 5;
 
+  @Config(key = "should.run.default",
+      defaultValue = "false",
+      type = ConfigType.BOOLEAN,
+      tags = { DATANODE, ConfigTag.DISKBALANCER},
+      description =
+          "If DiskBalancer fails to get information from diskbalancer.info, " +
+              "it will choose this value to decide if this service should be " 
+
+              "running."
+  )
+  private boolean diskBalancerShouldRun = false;
+
+  public boolean getDiskBalancerShouldRun() {
+    return diskBalancerShouldRun;
+  }
+
+  public void setDiskBalancerShouldRun(boolean shouldRun) {
+    this.diskBalancerShouldRun = shouldRun;
+  }
+
+  @Config(key = "service.interval",
+      defaultValue = "60s",
+      type = ConfigType.TIME,
+      tags = { DATANODE, ConfigTag.DISKBALANCER},
+      description = "Time interval of the Datanode DiskBalancer service. " +
+          "The Datanode will check the service periodically and update " +
+          "the config and running status for DiskBalancer service. " +
+          "Unit could be defined with postfix (ns,ms,s,m,h,d). "
+  )
+  private long diskBalancerInterval = Duration.ofSeconds(60).toMillis();
+
+  public Duration getDiskBalancerInterval() {
+    return Duration.ofMillis(diskBalancerInterval);
+  }
+
+  public void setDiskBalancerInterval(Duration duration) {
+    this.diskBalancerInterval = duration.toMillis();
+  }
+
+  @Config(key = "service.timeout",
+      defaultValue = "300s",
+      type = ConfigType.TIME,
+      tags = { DATANODE, ConfigTag.DISKBALANCER},
+      description = "Timeout for the Datanode DiskBalancer service. "
+          + "Unit could be defined with postfix (ns,ms,s,m,h,d). "
+  )
+  private long diskBalancerTimeout = Duration.ofSeconds(300).toMillis();
+
+  public Duration getDiskBalancerTimeout() {
+    return Duration.ofMillis(diskBalancerTimeout);
+  }
+
+  public void setDiskBalancerTimeout(Duration duration) {
+    this.diskBalancerTimeout = duration.toMillis();
+  }
+
+  public DiskBalancerConfiguration() {
+  }
+
+  public DiskBalancerConfiguration(Optional<Double> threshold,
+      Optional<Long> bandwidthInMB,
+      Optional<Integer> parallelThread) {
+    threshold.ifPresent(aDouble -> this.threshold = aDouble);
+    bandwidthInMB.ifPresent(aLong -> this.diskBandwidthInMB = aLong);
+    parallelThread.ifPresent(integer -> this.parallelThread = integer);
+  }
+
   /**
    * Gets the threshold value for DiskBalancer.
    *
@@ -86,7 +168,8 @@ public final class DiskBalancerConfiguration {
    *
    * @return max disk bandwidth per second
    */
-  public double getDiskBandwidthInMB() {
+
+  public long getDiskBandwidthInMB() {
     return diskBandwidthInMB;
   }
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index f3c08b252b..ad12bfa6ee 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -241,6 +241,9 @@ public final class OzoneConsts {
    */
   public static final String OZONE_SCM_DATANODE_ID_FILE_DEFAULT = 
"datanode.id";
 
+  public static final String
+      OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT = "diskBalancer.info";
+
   // The ServiceListJSONServlet context attribute where OzoneManager
   // instance gets stored.
   public static final String OM_CONTEXT_ATTRIBUTE = "ozone.om";
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/MockSpaceUsageCheckFactory.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/MockSpaceUsageCheckFactory.java
index 3b2e6415c2..eaeb3c1b4d 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/MockSpaceUsageCheckFactory.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/MockSpaceUsageCheckFactory.java
@@ -50,6 +50,22 @@ public final class MockSpaceUsageCheckFactory {
     }
   }
 
+  /**
+   * An implementation that never checks space usage but reports basically
+   * 512G free space.  Neither does it persist space usage info.
+   */
+  public static class HalfTera implements SpaceUsageCheckFactory {
+    @Override
+    public SpaceUsageCheckParams paramsFor(File dir) {
+      return new SpaceUsageCheckParams(dir,
+          MockSpaceUsageSource.fixed(512L * 1024 * 1024 * 1024,
+              512L * 1024 * 1024 * 1024),
+          Duration.ZERO,
+          SpaceUsagePersistence.None.INSTANCE
+      );
+    }
+  }
+
   private MockSpaceUsageCheckFactory() {
     throw new UnsupportedOperationException("no instances");
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
index 37f275dbb4..6ff745cafe 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
@@ -16,15 +16,21 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
 import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.DiskBalancerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -56,7 +62,38 @@ public class DiskBalancerCommandHandler implements 
CommandHandler {
   public void handle(SCMCommand command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
     invocationCount.incrementAndGet();
-    // TODO: Do start/stop/update operation
+    final long startTime = Time.monotonicNow();
+    DiskBalancerCommand diskBalancerCommand = (DiskBalancerCommand) command;
+
+    final HddsProtos.DiskBalancerOpType opType =
+        diskBalancerCommand.getOpType();
+    final DiskBalancerConfiguration diskBalancerConf =
+        diskBalancerCommand.getDiskBalancerConfiguration();
+
+    DiskBalancerInfo diskBalancerInfo = ozoneContainer.getDiskBalancerInfo();
+
+    try {
+      switch (opType) {
+      case START:
+        diskBalancerInfo.setShouldRun(true);
+        diskBalancerInfo.updateFromConf(diskBalancerConf);
+        break;
+      case STOP:
+        diskBalancerInfo.setShouldRun(false);
+        break;
+      case UPDATE:
+        diskBalancerInfo.updateFromConf(diskBalancerConf);
+        break;
+      default:
+        throw new IOException("Unexpected type " + opType);
+      }
+      ozoneContainer.getDiskBalancerService().refresh(diskBalancerInfo);
+    } catch (IOException e) {
+      LOG.error("Can't handle command type: {}", opType, e);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
new file mode 100644
index 0000000000..873c89aad1
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+
+import java.util.Objects;
+
+/**
+ * DiskBalancer's information to persist.
+ */
+public class DiskBalancerInfo {
+  private boolean shouldRun;
+  private double threshold;
+  private long bandwidthInMB;
+  private int parallelThread;
+  private DiskBalancerVersion version;
+
+  public DiskBalancerInfo(boolean shouldRun, double threshold,
+      long bandwidthInMB, int parallelThread) {
+    this(shouldRun, threshold, bandwidthInMB, parallelThread,
+        DiskBalancerVersion.DEFAULT_VERSION);
+  }
+
+  public DiskBalancerInfo(boolean shouldRun, double threshold,
+      long bandwidthInMB, int parallelThread, DiskBalancerVersion version) {
+    this.shouldRun = shouldRun;
+    this.threshold = threshold;
+    this.bandwidthInMB = bandwidthInMB;
+    this.parallelThread = parallelThread;
+    this.version = version;
+  }
+
+  public DiskBalancerInfo(boolean shouldRun,
+      DiskBalancerConfiguration diskBalancerConf) {
+    this.shouldRun = shouldRun;
+    this.threshold = diskBalancerConf.getThreshold();
+    this.bandwidthInMB = diskBalancerConf.getDiskBandwidthInMB();
+    this.parallelThread = diskBalancerConf.getParallelThread();
+    this.version = DiskBalancerVersion.DEFAULT_VERSION;
+  }
+
+  public void updateFromConf(DiskBalancerConfiguration diskBalancerConf) {
+    if (threshold != diskBalancerConf.getThreshold()) {
+      setThreshold(diskBalancerConf.getThreshold());
+    }
+    if (bandwidthInMB != diskBalancerConf.getDiskBandwidthInMB()) {
+      setBandwidthInMB(diskBalancerConf.getDiskBandwidthInMB());
+    }
+    if (parallelThread != diskBalancerConf.getParallelThread()) {
+      setParallelThread(diskBalancerConf.getParallelThread());
+    }
+  }
+
+  public boolean isShouldRun() {
+    return shouldRun;
+  }
+
+  public void setShouldRun(boolean shouldRun) {
+    this.shouldRun = shouldRun;
+  }
+
+  public double getThreshold() {
+    return threshold;
+  }
+
+  public void setThreshold(double threshold) {
+    this.threshold = threshold;
+  }
+
+  public long getBandwidthInMB() {
+    return bandwidthInMB;
+  }
+
+  public void setBandwidthInMB(long bandwidthInMB) {
+    this.bandwidthInMB = bandwidthInMB;
+  }
+
+  public int getParallelThread() {
+    return parallelThread;
+  }
+
+  public void setParallelThread(int parallelThread) {
+    this.parallelThread = parallelThread;
+  }
+
+  public DiskBalancerVersion getVersion() {
+    return version;
+  };
+
+  public void setVersion(DiskBalancerVersion version) {
+    this.version = version;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DiskBalancerInfo that = (DiskBalancerInfo) o;
+    return shouldRun == that.shouldRun &&
+        Double.compare(that.threshold, threshold) == 0 &&
+        bandwidthInMB == that.bandwidthInMB &&
+        parallelThread == that.parallelThread &&
+        version == that.version;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(shouldRun, threshold, bandwidthInMB, parallelThread,
+        version);
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
new file mode 100644
index 0000000000..75e1660029
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A per-datanode disk balancing service takes in charge
+ * of moving contains among disks.
+ */
+public class DiskBalancerService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerService.class);
+
+  public static final String DISK_BALANCER_DIR = "diskBalancer";
+
+  private static final String DISK_BALANCER_TMP_DIR = "tmp";
+
+  private OzoneContainer ozoneContainer;
+  private final ConfigurationSource conf;
+
+  private boolean shouldRun = false;
+  private double threshold;
+  private long bandwidthInMB;
+  private int parallelThread;
+
+  private DiskBalancerVersion version;
+
+  private AtomicLong totalBalancedBytes = new AtomicLong(0L);
+  private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L);
+  private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
+
+  private Map<DiskBalancerTask, Integer> inProgressTasks;
+  private Set<Long> inProgressContainers;
+
+  // Every time a container is decided to be moved from Vol A to Vol B,
+  // the size will be deducted from Vol A and added to Vol B.
+  // This map is used to help calculate the expected storage size after
+  // the container balancing finished.
+  private Map<HddsVolume, Long> deltaSizes;
+  private MutableVolumeSet volumeSet;
+
+  private final File diskBalancerInfoFile;
+
+  private DiskBalancerServiceMetrics metrics;
+
+  public DiskBalancerService(OzoneContainer ozoneContainer,
+      long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit,
+      int workerSize, ConfigurationSource conf) throws IOException {
+    super("DiskBalancerService", serviceCheckInterval, timeUnit, workerSize,
+        serviceCheckTimeout);
+    this.ozoneContainer = ozoneContainer;
+    this.conf = conf;
+
+    String diskBalancerInfoPath = getDiskBalancerInfoPath();
+    Preconditions.checkNotNull(diskBalancerInfoPath);
+    diskBalancerInfoFile = new File(diskBalancerInfoPath);
+
+    inProgressTasks = new ConcurrentHashMap<>();
+    inProgressContainers = ConcurrentHashMap.newKeySet();
+    deltaSizes = new ConcurrentHashMap<>();
+    volumeSet = ozoneContainer.getVolumeSet();
+
+    metrics = DiskBalancerServiceMetrics.create();
+
+    loadDiskBalancerInfo();
+
+    constructTmpDir();
+  }
+
+  /**
+   * Update DiskBalancerService based on new DiskBalancerInfo.
+   * @param diskBalancerInfo
+   * @throws IOException
+   */
+  public void refresh(DiskBalancerInfo diskBalancerInfo) throws IOException {
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void constructTmpDir() throws IOException {
+    for (HddsVolume volume:
+        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) {
+      Path tmpDir = getDiskBalancerTmpDir(volume);
+      try {
+        FileUtils.deleteFully(tmpDir);
+        FileUtils.createDirectories(tmpDir);
+      } catch (IOException ex) {
+        LOG.warn("Can not reconstruct tmp directory under volume {}", volume,
+            ex);
+        throw ex;
+      }
+    }
+  }
+
+  /**
+   * If the diskBalancer.info file exists, load the file. If not exists,
+   * return the default config.
+   * @throws IOException
+   */
+  private void loadDiskBalancerInfo() throws IOException {
+    DiskBalancerInfo diskBalancerInfo;
+    try {
+      if (diskBalancerInfoFile.exists()) {
+        diskBalancerInfo = readDiskBalancerInfoFile(diskBalancerInfoFile);
+      } else {
+        boolean shouldRunDefault =
+            conf.getObject(DiskBalancerConfiguration.class)
+                .getDiskBalancerShouldRun();
+        diskBalancerInfo = new DiskBalancerInfo(shouldRunDefault,
+            new DiskBalancerConfiguration());
+      }
+    } catch (IOException e) {
+      LOG.warn("Can not load diskBalancerInfo from diskBalancer.info file. " +
+          "Falling back to default configs", e);
+      throw e;
+    }
+
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo)
+      throws IOException {
+    // First store in local file, then update in memory variables
+    writeDiskBalancerInfoTo(diskBalancerInfo, diskBalancerInfoFile);
+
+    setShouldRun(diskBalancerInfo.isShouldRun());
+    setThreshold(diskBalancerInfo.getThreshold());
+    setBandwidthInMB(diskBalancerInfo.getBandwidthInMB());
+    setParallelThread(diskBalancerInfo.getParallelThread());
+    setVersion(diskBalancerInfo.getVersion());
+
+    // Default executorService is ScheduledThreadPoolExecutor, so we can
+    // update the poll size by setting corePoolSize.
+    if ((getExecutorService() instanceof ScheduledThreadPoolExecutor)) {
+      ((ScheduledThreadPoolExecutor) getExecutorService())
+          .setCorePoolSize(parallelThread);
+    }
+  }
+
+  private String getDiskBalancerInfoPath() {
+    String diskBalancerInfoDir =
+        conf.getObject(DiskBalancerConfiguration.class)
+            .getDiskBalancerInfoDir();
+    if (Strings.isNullOrEmpty(diskBalancerInfoDir)) {
+      File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
+      if (metaDirPath == null) {
+        // this means meta data is not found, in theory should not happen at
+        // this point because should've failed earlier.
+        throw new IllegalArgumentException("Unable to locate meta data" +
+            "directory when getting datanode disk balancer file path");
+      }
+      diskBalancerInfoDir = metaDirPath.toString();
+    }
+    // Use default datanode disk balancer file name for file path
+    return new File(diskBalancerInfoDir,
+        OzoneConsts.OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT)
+        .toString();
+  }
+
+  /**
+   * Read {@link DiskBalancerInfo} from a local info file.
+   *
+   * @param path DiskBalancerInfo file local path
+   * @return {@link DatanodeDetails}
+   * @throws IOException If the conf file is malformed or other I/O exceptions
+   */
+  private synchronized DiskBalancerInfo readDiskBalancerInfoFile(
+      File path) throws IOException {
+    if (!path.exists()) {
+      throw new IOException("DiskBalancerConf file not found.");
+    }
+    try {
+      return DiskBalancerYaml.readDiskBalancerInfoFile(path);
+    } catch (IOException e) {
+      LOG.warn("Error loading DiskBalancerInfo yaml from {}",
+          path.getAbsolutePath(), e);
+      throw new IOException("Failed to parse DiskBalancerInfo from "
+          + path.getAbsolutePath(), e);
+    }
+  }
+
+  /**
+   * Persistent a {@link DiskBalancerInfo} to a local file.
+   *
+   * @throws IOException when read/write error occurs
+   */
+  private synchronized void writeDiskBalancerInfoTo(
+      DiskBalancerInfo diskBalancerInfo, File path)
+      throws IOException {
+    if (path.exists()) {
+      if (!path.delete() || !path.createNewFile()) {
+        throw new IOException("Unable to overwrite the DiskBalancerInfo 
file.");
+      }
+    } else {
+      if (!path.getParentFile().exists() &&
+          !path.getParentFile().mkdirs()) {
+        throw new IOException("Unable to create DiskBalancerInfo 
directories.");
+      }
+    }
+    DiskBalancerYaml.createDiskBalancerInfoFile(diskBalancerInfo, path);
+  }
+
+
+
+  public void setShouldRun(boolean shouldRun) {
+    this.shouldRun = shouldRun;
+  }
+
+  public void setThreshold(double threshold) {
+    this.threshold = threshold;
+  }
+
+  public void setBandwidthInMB(long bandwidthInMB) {
+    this.bandwidthInMB = bandwidthInMB;
+  }
+
+  public void setParallelThread(int parallelThread) {
+    this.parallelThread = parallelThread;
+  }
+
+  public void setVersion(DiskBalancerVersion version) {
+    this.version = version;
+  }
+
+  public DiskBalancerInfo getDiskBalancerInfo() {
+    return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB,
+        parallelThread, version);
+  }
+
+  public DiskBalancerReportProto getDiskBalancerReportProto() {
+    DiskBalancerReportProto.Builder builder =
+        DiskBalancerReportProto.newBuilder();
+    return builder.setIsRunning(shouldRun)
+        .setBalancedBytes(totalBalancedBytes.get())
+        .setDiskBalancerConf(
+            HddsProtos.DiskBalancerConfigurationProto.newBuilder()
+                .setThreshold(threshold)
+                .setDiskBandwidthInMB(bandwidthInMB)
+                .setParallelThread(parallelThread)
+                .build())
+        .build();
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+
+    if (!shouldRun) {
+      return queue;
+    }
+    metrics.incrRunningLoopCount();
+
+    if (shouldDelay()) {
+      metrics.incrIdleLoopExceedsBandwidthCount();
+      return queue;
+    }
+
+    int availableTaskCount = parallelThread - inProgressTasks.size();
+    if (availableTaskCount <= 0) {
+      LOG.info("No available thread for disk balancer service. " +
+          "Current thread count is {}.", parallelThread);
+      return queue;
+    }
+
+    // TODO: Implementation for choose tasks
+
+    if (queue.isEmpty()) {
+      metrics.incrIdleLoopNoAvailableVolumePairCount();
+    }
+    return queue;
+  }
+
+  private boolean shouldDelay() {
+    // We should wait for next AvailableTime.
+    if (Time.monotonicNow() <= nextAvailableTime.get()) {
+      return true;
+    }
+    // Calculate the next AvailableTime based on bandwidth
+    long bytesBalanced = balancedBytesInLastWindow.getAndSet(0L);
+
+    final int megaByte = 1024 * 1024;
+
+    // converting disk bandwidth in byte/millisec
+    float bytesPerMillisec = bandwidthInMB * megaByte / 1000f;
+    nextAvailableTime.set(Time.monotonicNow() +
+        ((long) (bytesBalanced / bytesPerMillisec)));
+    return false;
+  }
+
+  private class DiskBalancerTask implements BackgroundTask {
+
+    private HddsVolume sourceVolume;
+    private HddsVolume destVolume;
+    private ContainerData containerData;
+
+    DiskBalancerTask(ContainerData containerData,
+        HddsVolume sourceVolume, HddsVolume destVolume) {
+      this.containerData = containerData;
+      this.sourceVolume = sourceVolume;
+      this.destVolume = destVolume;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      // TODO: Details of handling tasks
+      return BackgroundTaskResult.EmptyTaskResult.newResult();
+    }
+
+    @Override
+    public int getPriority() {
+      return BackgroundTask.super.getPriority();
+    }
+
+    private void postCall() {
+      inProgressContainers.remove(containerData.getContainerID());
+      deltaSizes.put(sourceVolume, deltaSizes.get(sourceVolume) +
+          containerData.getBytesUsed());
+      deltaSizes.put(destVolume, deltaSizes.get(destVolume)
+          - containerData.getBytesUsed());
+    }
+  }
+
+  private Path getDiskBalancerTmpDir(HddsVolume hddsVolume) {
+    return Paths.get(hddsVolume.getVolumeRootDir())
+        .resolve(DISK_BALANCER_TMP_DIR).resolve(DISK_BALANCER_DIR);
+  }
+
+  public boolean isBalancingContainer(long containerId) {
+    return inProgressContainers.contains(containerId);
+  }
+
+  public DiskBalancerServiceMetrics getMetrics() {
+    return metrics;
+  }
+
+  @VisibleForTesting
+  public void setBalancedBytesInLastWindow(long bytes) {
+    this.balancedBytesInLastWindow.set(bytes);
+  }
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    if (metrics != null) {
+      DiskBalancerServiceMetrics.unRegister();
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java
new file mode 100644
index 0000000000..3cf087b334
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * Metrics related to DiskBalancer Service running on Datanode.
+ */
+@Metrics(name = "DiskBalancerService Metrics", about = "Metrics related to "
+    + "background disk balancer service on Datanode", context = "dfs")
+public final class DiskBalancerServiceMetrics {
+
+  private static DiskBalancerServiceMetrics instance;
+  public static final String SOURCE_NAME =
+      DiskBalancerServiceMetrics.class.getSimpleName();
+
+  @Metric(about = "The number of successful balance job")
+  private MutableCounterLong successCount;
+
+  @Metric(about = "The total bytes for successfully balanced job.")
+  private MutableCounterLong successBytes;
+
+  @Metric(about = "The number of failed balance job.")
+  private MutableCounterLong failureCount;
+
+  @Metric(about = "The number of total running loop")
+  private MutableCounterLong runningLoopCount;
+
+  @Metric(about = "The number of idle loop that can not generate volume pair.")
+  private MutableCounterLong idleLoopNoAvailableVolumePairCount;
+
+  @Metric(about = "The number of idle loop due to bandwidth limits.")
+  private MutableCounterLong idleLoopExceedsBandwidthCount;
+
+  private DiskBalancerServiceMetrics() {
+  }
+
+  public static DiskBalancerServiceMetrics create() {
+    if (instance == null) {
+      MetricsSystem ms = DefaultMetricsSystem.instance();
+      instance = ms.register(SOURCE_NAME, "DiskBalancerService",
+          new DiskBalancerServiceMetrics());
+    }
+
+    return instance;
+  }
+
+  /**
+   * Unregister the metrics instance.
+   */
+  public static void unRegister() {
+    instance = null;
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+
+  public void incrSuccessCount(long count) {
+    this.successCount.incr(count);
+  }
+
+  public void incrSuccessBytes(long bytes) {
+    this.successBytes.incr(bytes);
+  }
+
+  public void incrFailureCount() {
+    this.failureCount.incr();
+  }
+
+  public void incrRunningLoopCount() {
+    this.runningLoopCount.incr();
+  }
+
+  public void incrIdleLoopNoAvailableVolumePairCount() {
+    this.idleLoopNoAvailableVolumePairCount.incr();
+  }
+
+  public void incrIdleLoopExceedsBandwidthCount() {
+    this.idleLoopExceedsBandwidthCount.incr();
+  }
+
+  public long getSuccessCount() {
+    return successCount.value();
+  }
+
+  public long getSuccessBytes() {
+    return successBytes.value();
+  }
+
+  public long getFailureCount() {
+    return failureCount.value();
+  }
+
+  public long getRunningLoopCount() {
+    return runningLoopCount.value();
+  }
+
+  public long getIdleLoopNoAvailableVolumePairCount() {
+    return idleLoopNoAvailableVolumePairCount.value();
+  }
+
+  public long getIdleLoopExceedsBandwidthCount() {
+    return idleLoopExceedsBandwidthCount.value();
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer buffer = new StringBuffer();
+    buffer.append("successCount = " + successCount.value()).append("\t")
+        .append("successBytes = " + successBytes.value()).append("\t")
+        .append("failureCount = " + failureCount.value()).append("\t")
+        .append("idleLoopNoAvailableVolumePairCount = " +
+            idleLoopNoAvailableVolumePairCount.value()).append("\t")
+        .append("idleLoopExceedsBandwidthCount = " +
+            idleLoopExceedsBandwidthCount.value());
+    return buffer.toString();
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerVersion.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerVersion.java
new file mode 100644
index 0000000000..28da8993c1
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerVersion.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/**
+ * Defines versions for the DiskBalancerService.
+ */
+public enum DiskBalancerVersion {
+  ONE(1, "First Version") {
+  };
+
+  private final int version;
+  private final String description;
+
+  public static final DiskBalancerVersion
+      DEFAULT_VERSION = DiskBalancerVersion.ONE;
+
+  private static final List<DiskBalancerVersion> DISK_BALANCER_VERSIONS =
+      ImmutableList.copyOf(values());
+
+  DiskBalancerVersion(int version, String description) {
+    this.version = version;
+    this.description = description;
+  }
+
+  public static DiskBalancerVersion getDiskBalancerVersion(int version) {
+    for (DiskBalancerVersion diskBalancerVersion :
+        DISK_BALANCER_VERSIONS) {
+      if (diskBalancerVersion.getVersion() == version) {
+        return diskBalancerVersion;
+      }
+    }
+    return null;
+  }
+
+  public static DiskBalancerVersion getDiskBalancerVersion(String versionStr) {
+    for (DiskBalancerVersion diskBalancerVersion :
+        DISK_BALANCER_VERSIONS) {
+      if (diskBalancerVersion.toString().equalsIgnoreCase(versionStr)) {
+        return diskBalancerVersion;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * @return version number.
+   */
+  public int getVersion() {
+    return version;
+  }
+
+  /**
+   * @return description.
+   */
+  public String getDescription() {
+    return description;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java
new file mode 100644
index 0000000000..d16eb65747
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import org.yaml.snakeyaml.DumperOptions;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Class for creating diskbalancer.info file in yaml format.
+ */
+
+public final class DiskBalancerYaml {
+
+  private DiskBalancerYaml() {
+    // static helper methods only, no state.
+  }
+
+  /**
+   * Creates a yaml file to store DiskBalancer info.
+   *
+   * @param diskBalancerInfo {@link DiskBalancerInfo}
+   * @param path            Path to diskBalancer.info file
+   */
+  public static void createDiskBalancerInfoFile(
+      DiskBalancerInfo diskBalancerInfo, File path)
+      throws IOException {
+    DumperOptions options = new DumperOptions();
+    options.setPrettyFlow(true);
+    options.setDefaultFlowStyle(DumperOptions.FlowStyle.FLOW);
+    Yaml yaml = new Yaml(options);
+
+    try (Writer writer = new OutputStreamWriter(
+        new FileOutputStream(path), StandardCharsets.UTF_8)) {
+      yaml.dump(getDiskBalancerInfoYaml(diskBalancerInfo), writer);
+    }
+  }
+
+  /**
+   * Read DiskBalancerConfiguration from file.
+   */
+  public static DiskBalancerInfo readDiskBalancerInfoFile(File path)
+      throws IOException {
+    DiskBalancerInfo diskBalancerInfo;
+
+    try (FileInputStream inputFileStream = new FileInputStream(path)) {
+      Yaml yaml = new Yaml();
+      DiskBalancerInfoYaml diskBalancerInfoYaml;
+      try {
+        diskBalancerInfoYaml =
+            yaml.loadAs(inputFileStream, DiskBalancerInfoYaml.class);
+      } catch (Exception e) {
+        throw new IOException("Unable to parse yaml file.", e);
+      }
+
+      diskBalancerInfo = new DiskBalancerInfo(
+          diskBalancerInfoYaml.isShouldRun(),
+          diskBalancerInfoYaml.getThreshold(),
+          diskBalancerInfoYaml.getBandwidthInMB(),
+          diskBalancerInfoYaml.getParallelThread(),
+          DiskBalancerVersion.getDiskBalancerVersion(
+              diskBalancerInfoYaml.version));
+    }
+
+    return diskBalancerInfo;
+  }
+
+  /**
+   * Datanode DiskBalancer Info to be written to the yaml file.
+   */
+  public static class DiskBalancerInfoYaml {
+    private boolean shouldRun;
+    private double threshold;
+    private long bandwidthInMB;
+    private int parallelThread;
+
+    private int version;
+
+    public DiskBalancerInfoYaml() {
+      // Needed for snake-yaml introspection.
+    }
+
+    private DiskBalancerInfoYaml(boolean shouldRun, double threshold,
+        long bandwidthInMB, int parallelThread, int version) {
+      this.shouldRun = shouldRun;
+      this.threshold = threshold;
+      this.bandwidthInMB = bandwidthInMB;
+      this.parallelThread = parallelThread;
+      this.version = version;
+    }
+
+    public boolean isShouldRun() {
+      return shouldRun;
+    }
+
+    public void setShouldRun(boolean shouldRun) {
+      this.shouldRun = shouldRun;
+    }
+
+    public void setThreshold(double threshold) {
+      this.threshold = threshold;
+    }
+
+    public double getThreshold() {
+      return threshold;
+    }
+
+    public void setBandwidthInMB(long bandwidthInMB) {
+      this.bandwidthInMB = bandwidthInMB;
+    }
+
+    public long getBandwidthInMB() {
+      return this.bandwidthInMB;
+    }
+
+    public void setParallelThread(int parallelThread) {
+      this.parallelThread = parallelThread;
+    }
+
+    public int getParallelThread() {
+      return this.parallelThread;
+    }
+
+    public void setVersion(int version) {
+      this.version = version;
+    }
+
+    public int getVersion() {
+      return this.version;
+    }
+  }
+
+  private static DiskBalancerInfoYaml getDiskBalancerInfoYaml(
+      DiskBalancerInfo diskBalancerInfo) {
+
+    return new DiskBalancerInfoYaml(
+        diskBalancerInfo.isShouldRun(),
+        diskBalancerInfo.getThreshold(),
+        diskBalancerInfo.getBandwidthInMB(),
+        diskBalancerInfo.getParallelThread(),
+        diskBalancerInfo.getVersion().getVersion());
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/package-info.java
new file mode 100644
index 0000000000..d8ba71eb9d
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ This package contains classes related to the DiskBalancer service.
+ */
+package org.apache.hadoop.ozone.container.diskbalancer;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index f81b58e317..a79a160e81 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -32,6 +32,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.security.SecurityConfig;
 import org.apache.hadoop.hdds.security.symmetric.SecretKeyVerifierClient;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
@@ -56,6 +57,8 @@ import 
org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import 
org.apache.hadoop.ozone.container.common.volume.StorageVolume.VolumeType;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolumeChecker;
+import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo;
+import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService;
 import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.StaleRecoveringContainerScrubbingService;
 import org.apache.hadoop.ozone.container.replication.ContainerImporter;
 import org.apache.hadoop.ozone.container.replication.ReplicationServer;
@@ -118,6 +121,7 @@ public class OzoneContainer {
   private final StaleRecoveringContainerScrubbingService
       recoveringContainerScrubbingService;
   private final GrpcTlsConfig tlsClientConfig;
+  private final DiskBalancerService diskBalancerService;
   private final AtomicReference<InitializingStatus> initializingStatus;
   private final ReplicationServer replicationServer;
   private DatanodeDetails datanodeDetails;
@@ -239,6 +243,15 @@ public class OzoneContainer {
             datanodeDetails.threadNamePrefix(),
             context.getParent().getReconfigurationHandler());
 
+    Duration diskBalancerSvcInterval = conf.getObject(
+        DiskBalancerConfiguration.class).getDiskBalancerInterval();
+    Duration diskBalancerSvcTimeout = conf.getObject(
+        DiskBalancerConfiguration.class).getDiskBalancerTimeout();
+    diskBalancerService =
+        new DiskBalancerService(this, diskBalancerSvcInterval.toMillis(),
+            diskBalancerSvcTimeout.toMillis(), TimeUnit.MILLISECONDS, 1,
+            config);
+
     Duration recoveringContainerScrubbingSvcInterval = conf.getObject(
         DatanodeConfiguration.class).getRecoveringContainerScrubInterval();
 
@@ -468,6 +481,7 @@ public class OzoneContainer {
     hddsDispatcher.init();
     hddsDispatcher.setClusterId(clusterId);
     blockDeletingService.start();
+    diskBalancerService.start();
     recoveringContainerScrubbingService.start();
 
     // mark OzoneContainer as INITIALIZED.
@@ -493,6 +507,7 @@ public class OzoneContainer {
       dbVolumeSet.shutdown();
     }
     blockDeletingService.shutdown();
+    diskBalancerService.shutdown();
     recoveringContainerScrubbingService.shutdown();
     ContainerMetrics.remove();
   }
@@ -592,7 +607,14 @@ public class OzoneContainer {
   }
 
   public DiskBalancerReportProto getDiskBalancerReport() {
-    // TODO: Return real disk balancer report
-    return null;
+    return diskBalancerService.getDiskBalancerReportProto();
+  }
+
+  public DiskBalancerInfo getDiskBalancerInfo() {
+    return diskBalancerService.getDiskBalancerInfo();
+  }
+
+  public DiskBalancerService getDiskBalancerService() {
+    return diskBalancerService;
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java
index ea97f34ca0..1780877787 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.protocol.commands;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
 
@@ -29,12 +30,12 @@ import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt
  */
 public class DiskBalancerCommand extends SCMCommand<DiskBalancerCommandProto> {
 
-  private final boolean shouldRun;
+  private final HddsProtos.DiskBalancerOpType opType;
   private final DiskBalancerConfiguration diskBalancerConfiguration;
 
-  public DiskBalancerCommand(final boolean shouldRun,
+  public DiskBalancerCommand(final HddsProtos.DiskBalancerOpType opType,
       final DiskBalancerConfiguration diskBalancerConfiguration) {
-    this.shouldRun = shouldRun;
+    this.opType = opType;
     this.diskBalancerConfiguration = diskBalancerConfiguration;
   }
 
@@ -50,22 +51,26 @@ public class DiskBalancerCommand extends 
SCMCommand<DiskBalancerCommandProto> {
 
   @Override
   public DiskBalancerCommandProto getProto() {
-    return DiskBalancerCommandProto.newBuilder()
-        .setShouldRun(shouldRun)
-        .setDiskBalancerConf(diskBalancerConfiguration.toProtobufBuilder())
-        .build();
+    DiskBalancerCommandProto.Builder builder = DiskBalancerCommandProto
+        .newBuilder().setOpType(opType);
+    // Stop command don't have diskBalancerConf
+    if (diskBalancerConfiguration != null) {
+      builder.setDiskBalancerConf(
+          diskBalancerConfiguration.toProtobufBuilder());
+    }
+    return builder.build();
   }
 
   public static DiskBalancerCommand getFromProtobuf(DiskBalancerCommandProto
       diskbalancerCommandProto, ConfigurationSource configuration) {
     Preconditions.checkNotNull(diskbalancerCommandProto);
-    return new DiskBalancerCommand(diskbalancerCommandProto.getShouldRun(),
+    return new DiskBalancerCommand(diskbalancerCommandProto.getOpType(),
         DiskBalancerConfiguration.fromProtobuf(
             diskbalancerCommandProto.getDiskBalancerConf(), configuration));
   }
 
-  public boolean isShouldRun() {
-    return shouldRun;
+  public HddsProtos.DiskBalancerOpType getOpType() {
+    return opType;
   }
 
   public DiskBalancerConfiguration getDiskBalancerConfiguration() {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java
new file mode 100644
index 0000000000..0d87ac3420
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A test class implementation for {@link DiskBalancerService}.
+ */
+public class DiskBalancerServiceTestImpl extends DiskBalancerService {
+
+  // the service timeout
+  private static final int SERVICE_TIMEOUT_IN_MILLISECONDS = 0;
+
+  // tests only
+  private CountDownLatch latch;
+  private Thread testingThread;
+  private AtomicInteger numOfProcessed = new AtomicInteger(0);
+
+  public DiskBalancerServiceTestImpl(OzoneContainer container,
+      int serviceInterval, ConfigurationSource conf, int threadCount)
+      throws IOException {
+    super(container, serviceInterval, SERVICE_TIMEOUT_IN_MILLISECONDS,
+        TimeUnit.MILLISECONDS, threadCount, conf);
+  }
+
+  public void runBalanceTasks() {
+    if (latch.getCount() > 0) {
+      this.latch.countDown();
+    } else {
+      throw new IllegalStateException("Count already reaches zero");
+    }
+  }
+
+  public boolean isStarted() {
+    return latch != null && testingThread.isAlive();
+  }
+
+  public int getTimesOfProcessed() {
+    return numOfProcessed.get();
+  }
+
+  // Override the implementation to start a single on-call control thread.
+  @Override
+  public void start() {
+    PeriodicalTask svc = new PeriodicalTask();
+    // In test mode, relies on a latch countdown to runDeletingTasks tasks.
+    Runnable r = () -> {
+      while (true) {
+        latch = new CountDownLatch(1);
+        try {
+          latch.await();
+        } catch (InterruptedException e) {
+          break;
+        }
+        Future<?> future = this.getExecutorService().submit(svc);
+        try {
+          // for tests, we only wait for 3s for completion
+          future.get(3000, TimeUnit.SECONDS);
+          numOfProcessed.incrementAndGet();
+        } catch (Exception e) {
+          e.printStackTrace();
+          return;
+        }
+      }
+    };
+
+    testingThread = new ThreadFactoryBuilder()
+        .setDaemon(true)
+        .build()
+        .newThread(r);
+    testingThread.start();
+  }
+
+  @Override
+  public void shutdown() {
+    testingThread.interrupt();
+    super.shutdown();
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
new file mode 100644
index 0000000000..c03f8cab75
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.container.common.TestBlockDeletingService;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This is a test class for DiskBalancerService.
+ */
+public class TestDiskBalancerService {
+  private File testRoot;
+  private String scmId;
+  private String datanodeUuid;
+  private OzoneConfiguration conf;
+
+  private final ContainerLayoutVersion layout;
+  private final String schemaVersion;
+  private MutableVolumeSet volumeSet;
+
+  public TestDiskBalancerService(ContainerTestVersionInfo versionInfo) {
+    this.layout = versionInfo.getLayout();
+    this.schemaVersion = versionInfo.getSchemaVersion();
+    conf = new OzoneConfiguration();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+  }
+
+
+  @BeforeEach
+  public void init() throws IOException {
+    testRoot = GenericTestUtils
+        .getTestDir(TestBlockDeletingService.class.getSimpleName());
+    if (testRoot.exists()) {
+      FileUtils.cleanDirectory(testRoot);
+    }
+    scmId = UUID.randomUUID().toString();
+    conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
+        generateVolumeLocation(testRoot.getAbsolutePath(), 2));
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testRoot.getAbsolutePath());
+    conf.set("hdds.datanode.du.factory.classname",
+        "org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory$HalfTera");
+    datanodeUuid = UUID.randomUUID().toString();
+    volumeSet = new MutableVolumeSet(datanodeUuid, scmId, conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
+    createDbInstancesForTestIfNeeded(volumeSet, scmId, scmId, conf);
+  }
+
+  @AfterEach
+  public void cleanup() throws IOException {
+    BlockUtils.shutdownCache(conf);
+    FileUtils.deleteDirectory(testRoot);
+  }
+
+  @Timeout(30)
+  @ContainerTestVersionInfo.ContainerTest
+  public void testUpdateService() throws Exception {
+    // Increase volume's usedBytes
+    for (StorageVolume volume : volumeSet.getVolumeMap().values()) {
+      volume.incrementUsedSpace(volume.getCapacity() / 2);
+    }
+
+    ContainerSet containerSet = new ContainerSet(1000);
+    ContainerMetrics metrics = ContainerMetrics.create(conf);
+    KeyValueHandler keyValueHandler =
+        new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
+            metrics, c -> {
+        });
+    DiskBalancerServiceTestImpl svc =
+        getDiskBalancerService(containerSet, conf, keyValueHandler, null, 1);
+
+    // Set a low bandwidth to delay job
+    svc.setShouldRun(true);
+    svc.setThreshold(10.0d);
+    svc.setBandwidthInMB(1L);
+    svc.setParallelThread(5);
+    svc.setVersion(DiskBalancerVersion.DEFAULT_VERSION);
+
+    svc.start();
+
+    assertTrue(svc.getDiskBalancerInfo().isShouldRun());
+    assertEquals(10, svc.getDiskBalancerInfo().getThreshold(), 0.0);
+    assertEquals(1, svc.getDiskBalancerInfo().getBandwidthInMB());
+    assertEquals(5, svc.getDiskBalancerInfo().getParallelThread());
+
+    DiskBalancerInfo newInfo = new DiskBalancerInfo(false, 20.0d, 5L, 10);
+    svc.refresh(newInfo);
+
+    assertFalse(svc.getDiskBalancerInfo().isShouldRun());
+    assertEquals(20, svc.getDiskBalancerInfo().getThreshold(), 0.0);
+    assertEquals(5, svc.getDiskBalancerInfo().getBandwidthInMB());
+    assertEquals(10, svc.getDiskBalancerInfo().getParallelThread());
+
+    svc.shutdown();
+  }
+
+  private String generateVolumeLocation(String base, int volumeCount) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < volumeCount; i++) {
+      sb.append(base + "/vol" + i);
+      sb.append(",");
+    }
+    return sb.substring(0, sb.length() - 1);
+  }
+
+  private DiskBalancerServiceTestImpl getDiskBalancerService(
+      ContainerSet containerSet, ConfigurationSource config,
+      KeyValueHandler keyValueHandler, ContainerController controller,
+      int threadCount) throws IOException {
+    OzoneContainer ozoneContainer =
+        mockDependencies(containerSet, keyValueHandler, controller);
+    return new DiskBalancerServiceTestImpl(ozoneContainer, 1000, config,
+        threadCount);
+  }
+
+  private OzoneContainer mockDependencies(ContainerSet containerSet,
+      KeyValueHandler keyValueHandler, ContainerController controller) {
+    OzoneContainer ozoneContainer = mock(OzoneContainer.class);
+    when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
+    when(ozoneContainer.getWriteChannel()).thenReturn(null);
+    ContainerDispatcher dispatcher = mock(ContainerDispatcher.class);
+    when(ozoneContainer.getDispatcher()).thenReturn(dispatcher);
+    when(dispatcher.getHandler(any())).thenReturn(keyValueHandler);
+    when(ozoneContainer.getVolumeSet()).thenReturn(volumeSet);
+    when(ozoneContainer.getController()).thenReturn(controller);
+    return ozoneContainer;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java
new file mode 100644
index 0000000000..3b61255b0b
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT;
+
+/**
+ * Tests to test DiskBalancer's YAML operation.
+ */
+public class TestDiskBalancerYaml {
+  @Test
+  public void testCreateYaml() throws IOException {
+    boolean shouldRun = true;
+    double threshold = 10;
+    long bandwidthInMB = 100;
+    int parallelThread = 5;
+    DiskBalancerVersion version = DiskBalancerVersion.DEFAULT_VERSION;
+
+    File file = new File(GenericTestUtils.getTestDir(),
+        OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT);
+
+    DiskBalancerInfo info = new DiskBalancerInfo(shouldRun, threshold,
+        bandwidthInMB, parallelThread, version);
+
+    DiskBalancerYaml.createDiskBalancerInfoFile(info, file);
+
+    DiskBalancerInfo newInfo = DiskBalancerYaml.readDiskBalancerInfoFile(file);
+
+    Assertions.assertEquals(info, newInfo);
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 91e6f5578d..c49062a410 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -50,7 +50,6 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoType;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpType;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
@@ -1168,7 +1167,7 @@ public final class 
StorageContainerLocationProtocolClientSideTranslatorPB
 
     DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
         DatanodeDiskBalancerOpRequestProto.newBuilder()
-            .setOpType(DatanodeDiskBalancerOpType.start)
+            .setOpType(HddsProtos.DiskBalancerOpType.START)
             .setConf(confBuilder);
     hosts.ifPresent(requestBuilder::addAllHosts);
 
@@ -1190,7 +1189,7 @@ public final class 
StorageContainerLocationProtocolClientSideTranslatorPB
       throws IOException {
     DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
         DatanodeDiskBalancerOpRequestProto.newBuilder()
-            .setOpType(DatanodeDiskBalancerOpType.stop);
+            .setOpType(HddsProtos.DiskBalancerOpType.STOP);
     hosts.ifPresent(requestBuilder::addAllHosts);
 
     DatanodeDiskBalancerOpResponseProto response =
@@ -1219,7 +1218,7 @@ public final class 
StorageContainerLocationProtocolClientSideTranslatorPB
 
     DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
         DatanodeDiskBalancerOpRequestProto.newBuilder()
-            .setOpType(DatanodeDiskBalancerOpType.update)
+            .setOpType(HddsProtos.DiskBalancerOpType.UPDATE)
             .setConf(confBuilder);
     hosts.ifPresent(requestBuilder::addAllHosts);
 
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto 
b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index fca2232921..7e660af307 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -382,14 +382,8 @@ message DatanodeDiskBalancerInfoResponseProto {
   repeated DatanodeDiskBalancerInfoProto info = 1;
 }
 
-enum DatanodeDiskBalancerOpType{
-  start = 1;
-  stop = 2;
-  update = 3;
-}
-
 message DatanodeDiskBalancerOpRequestProto {
-  required DatanodeDiskBalancerOpType opType = 1;
+  required DiskBalancerOpType opType = 1;
   repeated string hosts = 2;
   optional DiskBalancerConfigurationProto conf = 3;
 }
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto 
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index a53d035109..430a1a7b0e 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -534,6 +534,12 @@ enum DiskBalancerRunningStatus {
     UNKNOWN = 3;
 }
 
+enum DiskBalancerOpType{
+    START = 1;
+    STOP = 2;
+    UPDATE = 3;
+}
+
 message DatanodeDiskBalancerInfoProto {
     required DatanodeDetailsProto node = 1;
     required double currentVolumeDensitySum = 2;
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 7a265dd0ba..f2f7d537a2 100644
--- 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -452,7 +452,7 @@ message DatanodeDetailsAndReplicaIndexProto {
 This command asks the datanode to update diskBalancer status
  */
 message DiskBalancerCommandProto {
-  required bool shouldRun = 1;
+  required DiskBalancerOpType opType = 1;
   optional DiskBalancerConfigurationProto diskBalancerConf = 2;
 }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
index c6114c52f5..1af9bfde4c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
@@ -24,10 +24,15 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.DatanodeAdminError;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.DiskBalancerCommand;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -126,6 +131,118 @@ public class DiskBalancerManager {
     }
   }
 
+  /**
+   * Send startDiskBalancer command to datanodes.
+   * If hosts is not specified, send commands to all healthy datanodes.
+   * @param threshold new configuration of threshold
+   * @param bandwidthInMB new configuration of bandwidthInMB
+   * @param parallelThread new configuration of parallelThread
+   * @param hosts Datanodes that command will apply on
+   * @return Possible errors
+   * @throws IOException
+   */
+  public List<DatanodeAdminError> startDiskBalancer(
+      Optional<Double> threshold, Optional<Long> bandwidthInMB,
+      Optional<Integer> parallelThread, Optional<List<String>> hosts)
+      throws IOException {
+    List<DatanodeDetails> dns;
+    if (hosts.isPresent()) {
+      dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+          useHostnames);
+    } else {
+      dns = nodeManager.getNodes(NodeStatus.inServiceHealthy());
+    }
+
+    List<DatanodeAdminError> errors = new ArrayList<>();
+    for (DatanodeDetails dn : dns) {
+      try {
+        if (nodeManager.getNodeStatus(dn).isHealthy()) {
+          errors.add(new DatanodeAdminError(dn.getHostName(),
+              "Datanode not in healthy state"));
+          continue;
+        }
+        // If command doesn't have configuration change, then we reuse the
+        // latest configuration reported from Datnaodes
+        DiskBalancerConfiguration updateConf = attachDiskBalancerConf(dn,
+            threshold, bandwidthInMB, parallelThread);
+        DiskBalancerCommand command = new DiskBalancerCommand(
+            HddsProtos.DiskBalancerOpType.START, updateConf);
+        sendCommand(dn, command);
+      } catch (Exception e) {
+        errors.add(new DatanodeAdminError(dn.getHostName(), e.getMessage()));
+      }
+    }
+    return errors;
+  }
+
+  /**
+   * Send stopDiskBalancer command to datanodes
+   * If hosts is not specified, send commands to all datanodes.
+   * @param hosts Datanodes that command will apply on
+   * */
+  public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> 
hosts)
+      throws IOException {
+    List<DatanodeDetails> dns;
+    if (hosts.isPresent()) {
+      dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+          useHostnames);
+    } else {
+      dns = nodeManager.getNodes(NodeStatus.inServiceHealthy());
+    }
+
+    List<DatanodeAdminError> errors = new ArrayList<>();
+    for (DatanodeDetails dn : dns) {
+      try {
+        DiskBalancerCommand command = new DiskBalancerCommand(
+            HddsProtos.DiskBalancerOpType.STOP, null);
+        sendCommand(dn, command);
+      } catch (Exception e) {
+        errors.add(new DatanodeAdminError(dn.getHostName(), e.getMessage()));
+      }
+    }
+    return errors;
+  }
+
+
+  /**
+   * Send update DiskBalancerConf command to datanodes.
+   * If hosts is not specified, send commands to all healthy datanodes.
+   * @param threshold new configuration of threshold
+   * @param bandwidthInMB new configuration of bandwidthInMB
+   * @param parallelThread new configuration of parallelThread
+   * @param hosts Datanodes that command will apply on
+   * @return Possible errors
+   * @throws IOException
+   */
+  public List<DatanodeAdminError> updateDiskBalancerConfiguration(
+      Optional<Double> threshold, Optional<Long> bandwidthInMB,
+      Optional<Integer> parallelThread, Optional<List<String>> hosts)
+      throws IOException {
+    List<DatanodeDetails> dns;
+    if (hosts.isPresent()) {
+      dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+          useHostnames);
+    } else {
+      dns = nodeManager.getNodes(NodeStatus.inServiceHealthy());
+    }
+
+    List<DatanodeAdminError> errors = new ArrayList<>();
+    for (DatanodeDetails dn : dns) {
+      try {
+        // If command doesn't have configuration change, then we reuse the
+        // latest configuration reported from Datnaodes
+        DiskBalancerConfiguration updateConf = attachDiskBalancerConf(dn,
+            threshold, bandwidthInMB, parallelThread);
+        DiskBalancerCommand command = new DiskBalancerCommand(
+            HddsProtos.DiskBalancerOpType.UPDATE, updateConf);
+        sendCommand(dn, command);
+      } catch (Exception e) {
+        errors.add(new DatanodeAdminError(dn.getHostName(), e.getMessage()));
+      }
+    }
+    return errors;
+  }
+
   private boolean shouldReturnDatanode(
       HddsProtos.DiskBalancerRunningStatus status,
       DatanodeDetails datanodeDetails) {
@@ -218,6 +335,30 @@ public class DiskBalancerManager {
     }
   }
 
+  private DiskBalancerConfiguration attachDiskBalancerConf(
+      DatanodeDetails dn, Optional<Double> threshold,
+      Optional<Long> bandwidthInMB, Optional<Integer> parallelThread) {
+    DiskBalancerConfiguration baseConf = statusMap.containsKey(dn) ?
+        statusMap.get(dn).getDiskBalancerConfiguration() :
+        new DiskBalancerConfiguration();
+    threshold.ifPresent(baseConf::setThreshold);
+    bandwidthInMB.ifPresent(baseConf::setDiskBandwidthInMB);
+    parallelThread.ifPresent(baseConf::setParallelThread);
+    return baseConf;
+  }
+
+  private void sendCommand(DatanodeDetails dn, DiskBalancerCommand command) {
+    try {
+      command.setTerm(scmContext.getTermOfLeader());
+    } catch (NotLeaderException nle) {
+      LOG.warn("Skip sending DiskBalancerCommand for Datanode {}," +
+          " since not leader SCM.", dn.getUuidString());
+      return;
+    }
+    scmNodeEventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+        new CommandForDatanode<>(dn.getUuid(), command));
+  }
+
   @VisibleForTesting
   public Map<DatanodeDetails, DiskBalancerStatus> getStatusMap() {
     return statusMap;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 60b17d1fcb..ec7b486795 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -1347,21 +1347,21 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
           throws IOException {
     List<DatanodeAdminError> errors;
     switch (request.getOpType()) {
-    case start:
+    case START:
       errors = impl.startDiskBalancer(
               Optional.of(request.getConf().getThreshold()),
               Optional.of(request.getConf().getDiskBandwidthInMB()),
               Optional.of(request.getConf().getParallelThread()),
               Optional.of(request.getHostsList()));
       break;
-    case update:
+    case UPDATE:
       errors = impl.updateDiskBalancerConfiguration(
               Optional.of(request.getConf().getThreshold()),
               Optional.of(request.getConf().getDiskBandwidthInMB()),
               Optional.of(request.getConf().getParallelThread()),
               Optional.of(request.getHostsList()));
       break;
-    case stop:
+    case STOP:
       errors = impl.stopDiskBalancer(Optional.of(request.getHostsList()));
       break;
     default:
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 1704c61515..c5ab07154a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -1274,6 +1274,7 @@ public class SCMClientProtocolServer implements
         ContainerID.valueOf(startContainerID), count, state);
   }
 
+  @Override
   public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
       int count, int clientVersion) throws IOException {
     // check admin authorisation
@@ -1309,15 +1310,27 @@ public class SCMClientProtocolServer implements
   public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
       Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
       Optional<List<String>> hosts) throws IOException {
-    // TODO: Send message to datanodes
-    return new ArrayList<DatanodeAdminError>();
+    try {
+      getScm().checkAdminAccess(getRemoteUser(), false);
+    } catch (IOException e) {
+      LOG.error("Authorization failed", e);
+      throw e;
+    }
+
+    return scm.getDiskBalancerManager()
+        .startDiskBalancer(threshold, bandwidthInMB, parallelThread, hosts);
   }
 
   @Override
   public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> 
hosts)
       throws IOException {
-    // TODO: Send message to datanodes
-    return new ArrayList<DatanodeAdminError>();
+    try {
+      getScm().checkAdminAccess(getRemoteUser(), false);
+    } catch (IOException e) {
+      LOG.error("Authorization failed", e);
+      throw e;
+    }
+    return scm.getDiskBalancerManager().stopDiskBalancer(hosts);
   }
 
 
@@ -1326,8 +1339,15 @@ public class SCMClientProtocolServer implements
       Optional<Double> threshold, Optional<Long> bandwidthInMB,
       Optional<Integer> parallelThread, Optional<List<String>> hosts)
       throws IOException {
-    // TODO: Send message to datanodes
-    return new ArrayList<DatanodeAdminError>();
+    try {
+      getScm().checkAdminAccess(getRemoteUser(), false);
+    } catch (IOException e) {
+      LOG.error("Authorization failed", e);
+      throw e;
+    }
+
+    return scm.getDiskBalancerManager().updateDiskBalancerConfiguration(
+        threshold, bandwidthInMB, parallelThread, hosts);
   }
 
   /**
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java
index f58c7a1f84..fc87b145a7 100644
--- 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java
@@ -86,4 +86,4 @@ public class DiskBalancerStartSubcommand extends 
ScmSubcommand {
   public void setAllHosts(boolean allHosts) {
     this.commonOptions.setAllHosts(allHosts);
   }
-}
\ No newline at end of file
+}
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java
index 28ed4fd325..744d007539 100644
--- 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java
@@ -84,4 +84,4 @@ public class DiskBalancerUpdateSubcommand extends 
ScmSubcommand {
   public void setAllHosts(boolean allHosts) {
     this.commonOptions.setAllHosts(allHosts);
   }
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to