This is an automated email from the ASF dual-hosted git repository.

yiyang0203 pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 9014a79df83946c8177a3ee26daab25d01fea519
Author: Symious <[email protected]>
AuthorDate: Wed Sep 7 12:19:57 2022 +0800

    HDDS-7155. [DiskBalancer] Create interface between SCM and DN (#3701)
---
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |   4 +
 .../apache/hadoop/hdds/scm/client/ScmClient.java   |  21 ++--
 .../protocol/StorageContainerLocationProtocol.java |  17 +--
 .../scm/storage/DiskBalancerConfiguration.java     |  39 +++----
 .../common/src/main/resources/ozone-default.xml    |   8 ++
 .../common/report/DiskBalancerReportPublisher.java |  66 ++++++++++++
 .../common/report/ReportPublisherFactory.java      |   3 +
 .../common/statemachine/DatanodeStateMachine.java  |   2 +
 .../common/statemachine/StateContext.java          |  13 +++
 .../commandhandler/DiskBalancerCommandHandler.java | 117 +++++++++++++++++++++
 .../states/endpoint/HeartbeatEndpointTask.java     |   7 ++
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   5 +
 .../protocol/commands/DiskBalancerCommand.java     |  74 +++++++++++++
 .../common/report/TestReportPublisher.java         |  42 ++++++++
 .../common/report/TestReportPublisherFactory.java  |  12 +++
 ...inerLocationProtocolClientSideTranslatorPB.java |  81 ++++++++++++--
 .../src/main/proto/ScmAdminProtocol.proto          |  20 ++++
 .../interface-client/src/main/proto/hdds.proto     |  10 +-
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |  17 +++
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |   9 ++
 .../hadoop/hdds/scm/node/DiskBalancerManager.java  | 109 +++++++++++++------
 .../hdds/scm/node/DiskBalancerReportHandler.java   |  65 ++++++++++++
 .../hadoop/hdds/scm/node/DiskBalancerStatus.java   |   2 -
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |   8 ++
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  17 +++
 ...inerLocationProtocolServerSideTranslatorPB.java |  51 ++++++++-
 .../hdds/scm/server/SCMClientProtocolServer.java   |  22 ++--
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java |  23 ++++
 .../hdds/scm/server/StorageContainerManager.java   |   5 +
 .../hadoop/hdds/scm/container/MockNodeManager.java |  51 ++++++++-
 .../hdds/scm/container/SimpleMockNodeManager.java  |   5 +
 .../hdds/scm/node/TestDiskBalancerManager.java     |  34 ++++++
 .../testutils/ReplicationNodeManagerMock.java      |   6 ++
 .../hdds/scm/cli/ContainerOperationClient.java     |  29 ++---
 .../hadoop/ozone/scm/node/TestDiskBalancer.java    |   4 +
 35 files changed, 902 insertions(+), 96 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index d0c31bf288..b3b5f103d1 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -46,6 +46,10 @@ public final class HddsConfigKeys {
       "hdds.pipeline.report.interval";
   public static final String HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT =
       "60s";
+  public static final String HDDS_DISK_BALANCER_REPORT_INTERVAL =
+      "hdds.disk.balancer.report.interval";
+  public static final String HDDS_DISK_BALANCER_REPORT_INTERVAL_DEFAULT =
+      "60s";
   public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL =
       "hdds.command.status.report.interval";
   public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index 478d08dff0..2fd91fbc43 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -456,7 +456,8 @@ public interface ScmClient extends Closeable {
   String getMetrics(String query) throws IOException;
 
   /**
-   * Get DiskBalancer status.
+   * Get DiskBalancer report.
+   * REPORT shows the current volume density of datanodes.
    * @param count top datanodes that need balancing
    * @return List of DatanodeDiskBalancerInfo.
    * @throws IOException
@@ -466,33 +467,39 @@ public interface ScmClient extends Closeable {
 
   /**
    * Get DiskBalancer status.
+   * STATUS shows the running status of DiskBalancer on datanodes.
    * @param hosts If hosts is not null, return status of hosts; If hosts is
    *              null, return status of all datanodes in balancing.
    * @return List of DatanodeDiskBalancerInfo.
    * @throws IOException
    */
   List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
-      Optional<List<String>> hosts) throws IOException;
+      Optional<List<String>> hosts,
+      Optional<HddsProtos.DiskBalancerRunningStatus> runningStatus)
+      throws IOException;
 
   /**
    * Start DiskBalancer.
    */
-  void startDiskBalancer(
+  List<DatanodeAdminError> startDiskBalancer(
       Optional<Double> threshold,
-      Optional<Double> bandwidth,
+      Optional<Long> bandwidthInMB,
+      Optional<Integer> parallelThread,
       Optional<List<String>> hosts) throws IOException;
 
   /**
    * Stop DiskBalancer.
    */
-  void stopDiskBalancer(Optional<List<String>> hosts) throws IOException;
+  List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts)
+      throws IOException;
 
 
   /**
    * Update DiskBalancer Configuration.
    */
-  void updateDiskBalancerConfiguration(
+  List<DatanodeAdminError> updateDiskBalancerConfiguration(
       Optional<Double> threshold,
-      Optional<Double> bandwidth,
+      Optional<Long> bandwidth,
+      Optional<Integer> parallelThread,
       Optional<List<String>> hosts) throws IOException;
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 2a7969049d..651c9905d7 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -484,26 +484,31 @@ public interface StorageContainerLocationProtocol extends 
Closeable {
    * Get DiskBalancer status.
    */
   List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
-      Optional<List<String>> hosts, int clientVersion) throws IOException;
+      Optional<List<String>> hosts,
+      Optional<HddsProtos.DiskBalancerRunningStatus> runningStatus,
+      int clientVersion) throws IOException;
 
   /**
    * Start DiskBalancer.
    */
-  void startDiskBalancer(
+  List<DatanodeAdminError> startDiskBalancer(
       Optional<Double> threshold,
-      Optional<Double> bandwidth,
+      Optional<Long> bandwidthInMB,
+      Optional<Integer> parallelThread,
       Optional<List<String>> hosts) throws IOException;
 
   /**
    * Stop DiskBalancer.
    */
-  void stopDiskBalancer(Optional<List<String>> hosts) throws IOException;
+  List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts)
+      throws IOException;
 
   /**
    * Update DiskBalancer Configuration.
    */
-  void updateDiskBalancerConfiguration(
+  List<DatanodeAdminError> updateDiskBalancerConfiguration(
       Optional<Double> threshold,
-      Optional<Double> bandwidth,
+      Optional<Long> bandwidthInMB,
+      Optional<Integer> parallelThread,
       Optional<List<String>> hosts) throws IOException;
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
index 54709e4c17..e460e0b6eb 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigTag;
 import org.apache.hadoop.hdds.conf.ConfigType;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +36,7 @@ public final class DiskBalancerConfiguration {
   private static final Logger LOG =
       LoggerFactory.getLogger(DiskBalancerConfiguration.class);
 
-  @Config(key = "volume.density.threshold", type = ConfigType.AUTO,
+  @Config(key = "volume.density.threshold", type = ConfigType.DOUBLE,
       defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
       description = "Threshold is a percentage in the range of 0 to 100. A " +
           "datanode is considered balanced if for each volume, the " +
@@ -45,12 +45,12 @@ public final class DiskBalancerConfiguration {
           " of the entire datanode) no more than the threshold.")
   private double threshold = 10d;
 
-  @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.AUTO,
+  @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.LONG,
       defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
       description = "The max balance speed.")
-  private double diskBandwidth = 10;
+  private long diskBandwidthInMB = 10;
 
-  @Config(key = "parallel.thread", type = ConfigType.AUTO,
+  @Config(key = "parallel.thread", type = ConfigType.INT,
       defaultValue = "5", tags = {ConfigTag.DISKBALANCER},
       description = "The max parallel balance thread count.")
   private int parallelThread = 5;
@@ -86,21 +86,21 @@ public final class DiskBalancerConfiguration {
    *
    * @return max disk bandwidth per second
    */
-  public double getDiskBandwidth() {
-    return diskBandwidth;
+  public double getDiskBandwidthInMB() {
+    return diskBandwidthInMB;
   }
 
   /**
    * Sets the disk bandwidth value for Disk Balancer.
    *
-   * @param diskBandwidth the bandwidth to control balance speed
+   * @param diskBandwidthInMB the bandwidth to control balance speed
    */
-  public void setDiskBandwidth(double diskBandwidth) {
-    if (diskBandwidth <= 0d) {
+  public void setDiskBandwidthInMB(long diskBandwidthInMB) {
+    if (diskBandwidthInMB <= 0L) {
       throw new IllegalArgumentException(
-          "diskBandwidth must be a value larger than 0.");
+          "diskBandwidthInMB must be a value larger than 0.");
     }
-    this.diskBandwidth = diskBandwidth;
+    this.diskBandwidthInMB = diskBandwidthInMB;
   }
 
   /**
@@ -124,6 +124,7 @@ public final class DiskBalancerConfiguration {
     }
     this.parallelThread = parallelThread;
   }
+
   @Override
   public String toString() {
     return String.format("Disk Balancer Configuration values:%n" +
@@ -132,7 +133,7 @@ public final class DiskBalancerConfiguration {
             "%-50s %s%n" +
             "%-50s %s%n",
             "Key", "Value",
-        "Threshold", threshold, "Max disk bandwidth", diskBandwidth,
+        "Threshold", threshold, "Max disk bandwidth", diskBandwidthInMB,
         "Parallel Thread", parallelThread);
   }
 
@@ -141,21 +142,21 @@ public final class DiskBalancerConfiguration {
         HddsProtos.DiskBalancerConfigurationProto.newBuilder();
 
     builder.setThreshold(threshold)
-        .setDiskBandwidth(diskBandwidth)
+        .setDiskBandwidthInMB(diskBandwidthInMB)
         .setParallelThread(parallelThread);
     return builder;
   }
 
-  static DiskBalancerConfiguration fromProtobuf(
+  public static DiskBalancerConfiguration fromProtobuf(
       @Nonnull HddsProtos.DiskBalancerConfigurationProto proto,
-      @Nonnull OzoneConfiguration ozoneConfiguration) {
+      @Nonnull ConfigurationSource configurationSource) {
     DiskBalancerConfiguration config =
-        ozoneConfiguration.getObject(DiskBalancerConfiguration.class);
+        configurationSource.getObject(DiskBalancerConfiguration.class);
     if (proto.hasThreshold()) {
       config.setThreshold(proto.getThreshold());
     }
-    if (proto.hasDiskBandwidth()) {
-      config.setDiskBandwidth(proto.getDiskBandwidth());
+    if (proto.hasDiskBandwidthInMB()) {
+      config.setDiskBandwidthInMB(proto.getDiskBandwidthInMB());
     }
     if (proto.hasParallelThread()) {
       config.setParallelThread(proto.getParallelThread());
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index fc873f20af..e59e29257c 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -386,6 +386,14 @@
       datanode periodically send pipeline report to SCM. Unit could be
       defined with postfix (ns,ms,s,m,h,d)</description>
   </property>
+  <property>
+    <name>hdds.disk.balancer.report.interval</name>
+    <value>60000ms</value>
+    <tag>OZONE, CONTAINER, DISK_BALANCER</tag>
+    <description>Time interval of the datanode to send disk balancer report. 
Each
+      datanode periodically sends disk balancer report to SCM. Unit could be
+      defined with postfix (ns,ms,s,m,h,d)</description>
+  </property>
 
 
   <property>
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/DiskBalancerReportPublisher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/DiskBalancerReportPublisher.java
new file mode 100644
index 0000000000..2bb78c1ee6
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/DiskBalancerReportPublisher.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import com.google.common.base.Preconditions;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
+
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DISK_BALANCER_REPORT_INTERVAL;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DISK_BALANCER_REPORT_INTERVAL_DEFAULT;
+
+
+/**
+ * Publishes DiskBalancer report which will be sent to SCM as part of 
heartbeat.
+ * DiskBalancer Report consist of the following information:
+ *   - isBalancerRunning
+ *   - balancedBytes
+ *   - DiskBalancerConfiguration
+ */
+public class DiskBalancerReportPublisher extends
+    ReportPublisher<DiskBalancerReportProto> {
+
+  private Long diskBalancerReportInterval = null;
+
+  @Override
+  protected long getReportFrequency() {
+    if (diskBalancerReportInterval == null) {
+      diskBalancerReportInterval = getConf().getTimeDuration(
+          HDDS_DISK_BALANCER_REPORT_INTERVAL,
+          HDDS_DISK_BALANCER_REPORT_INTERVAL_DEFAULT,
+          TimeUnit.MILLISECONDS);
+
+      long heartbeatFrequency = HddsServerUtil.getScmHeartbeatInterval(
+          getConf());
+
+      Preconditions.checkState(
+          heartbeatFrequency <= diskBalancerReportInterval,
+              HDDS_DISK_BALANCER_REPORT_INTERVAL +
+              " cannot be configured lower than heartbeat frequency " +
+                  heartbeatFrequency + ".");
+    }
+    return diskBalancerReportInterval;
+  }
+
+  @Override
+  protected DiskBalancerReportProto getReport() {
+    return getContext().getParent().getContainer().getDiskBalancerReport();
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
index 3be1b5e077..19d2806919 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 
@@ -55,6 +56,8 @@ public class ReportPublisherFactory {
     report2publisher.put(PipelineReportsProto.class,
             PipelineReportPublisher.class);
     report2publisher.put(CRLStatusReport.class, 
CRLStatusReportPublisher.class);
+    report2publisher.put(DiskBalancerReportProto.class,
+        DiskBalancerReportPublisher.class);
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 9677144054..1623c1ae28 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -55,6 +55,7 @@ import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.Comm
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CreatePipelineCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteContainerCommandHandler;
+import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DiskBalancerCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.FinalizeNewLayoutVersionCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconstructECContainersCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler;
@@ -258,6 +259,7 @@ public class DatanodeStateMachine implements Closeable {
             supervisor::nodeStateUpdated))
         .addHandler(new FinalizeNewLayoutVersionCommandHandler())
         .addHandler(new RefreshVolumeUsageCommandHandler())
+        .addHandler(new DiskBalancerCommandHandler())
         .setConnectionManager(connectionManager)
         .setContainer(container)
         .setContext(context)
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 6bbf8e4794..70fafa2c84 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -50,6 +50,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
@@ -98,6 +99,9 @@ public class StateContext {
   @VisibleForTesting
   static final String CRL_STATUS_REPORT_PROTO_NAME =
       CRLStatusReport.getDescriptor().getFullName();
+  @VisibleForTesting
+  static final String DISK_BALANCER_REPORT_PROTO_NAME =
+      DiskBalancerReportProto.getDescriptor().getFullName();
 
   static final Logger LOG =
       LoggerFactory.getLogger(StateContext.class);
@@ -113,6 +117,7 @@ public class StateContext {
   private final AtomicReference<Message> nodeReport;
   private final AtomicReference<Message> pipelineReports;
   private final AtomicReference<Message> crlStatusReport;
+  private final AtomicReference<Message> diskBalancerReport;
   // Incremental reports are queued in the map below
   private final Map<InetSocketAddress, List<Message>>
       incrementalReportsQueue;
@@ -180,6 +185,7 @@ public class StateContext {
     nodeReport = new AtomicReference<>();
     pipelineReports = new AtomicReference<>();
     crlStatusReport = new AtomicReference<>(); // Certificate Revocation List
+    diskBalancerReport = new AtomicReference<>();
     endpoints = new HashSet<>();
     containerActions = new HashMap<>();
     pipelineActions = new HashMap<>();
@@ -206,6 +212,8 @@ public class StateContext {
     type2Reports.put(PIPELINE_REPORTS_PROTO_NAME, pipelineReports);
     fullReportTypeList.add(CRL_STATUS_REPORT_PROTO_NAME);
     type2Reports.put(CRL_STATUS_REPORT_PROTO_NAME, crlStatusReport);
+    fullReportTypeList.add(DISK_BALANCER_REPORT_PROTO_NAME);
+    type2Reports.put(DISK_BALANCER_REPORT_PROTO_NAME, diskBalancerReport);
   }
 
   /**
@@ -958,6 +966,11 @@ public class StateContext {
     return crlStatusReport.get();
   }
 
+  @VisibleForTesting
+  public Message getDiskBalancerReport() {
+    return diskBalancerReport.get();
+  }
+
   public void configureReconHeartbeatFrequency() {
     reconHeartbeatFrequency.set(getReconHeartbeatInterval(conf));
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
new file mode 100644
index 0000000000..37f275dbb4
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Handler for DiskBalancer command received from SCM.
+ */
+public class DiskBalancerCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerCommandHandler.class);
+
+  private AtomicLong invocationCount = new AtomicLong(0);
+  private long totalTime;
+
+  /**
+   * Constructs a diskBalancerCommand handler.
+   */
+  public DiskBalancerCommandHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command           - SCM Command
+   * @param ozoneContainer    - Ozone Container.
+   * @param context           - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+      StateContext context, SCMConnectionManager connectionManager) {
+    invocationCount.incrementAndGet();
+    // TODO: Do start/stop/update operation
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.diskBalancerCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return (int)invocationCount.get();
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
+    }
+    return 0;
+  }
+
+  @Override
+  public long getTotalRunTime() {
+    return totalTime;
+  }
+
+  @Override
+  public int getQueuedCount() {
+    return 0;
+  }
+
+  private void startDiskBalancer(DiskBalancerConfiguration configuration) {
+    // Todo: add implementation to start DiskBalancer
+  }
+
+  private void stopDiskBalancer() {
+    // Todo: add implementation to stop DiskBalancer
+  }
+
+  private void updateDiskBalancer(DiskBalancerConfiguration
+      configuration) {
+    // Todo: add implementation to update diskBalancer configuration
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 44f0eae49e..0685650361 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -54,6 +54,7 @@ import 
org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.DiskBalancerCommand;
 import 
org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
 import 
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
 import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand;
@@ -416,6 +417,12 @@ public class HeartbeatEndpointTask
             commandResponseProto.getRefreshVolumeUsageCommandProto());
         processCommonCommand(commandResponseProto, refreshVolumeUsageCommand);
         break;
+      case diskBalancerCommand:
+        DiskBalancerCommand diskBalancerCommand =
+            DiskBalancerCommand.getFromProtobuf(
+                commandResponseProto.getDiskBalancerCommandProto(), conf);
+        processCommonCommand(commandResponseProto, diskBalancerCommand);
+        break;
       default:
         throw new IllegalArgumentException("Unknown response : "
             + commandResponseProto.getCommandType().name());
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index aef3965dcd..f81b58e317 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -27,6 +27,7 @@ import 
org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.security.SecurityConfig;
@@ -590,4 +591,8 @@ public class OzoneContainer {
     return replicationServer;
   }
 
+  public DiskBalancerReportProto getDiskBalancerReport() {
+    // TODO: Return real disk balancer report
+    return null;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java
new file mode 100644
index 0000000000..ea97f34ca0
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+
+import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerCommandProto;
+
+/**
+ * Informs a datanode to update DiskBalancer status.
+ */
+public class DiskBalancerCommand extends SCMCommand<DiskBalancerCommandProto> {
+
+  private final boolean shouldRun;
+  private final DiskBalancerConfiguration diskBalancerConfiguration;
+
+  public DiskBalancerCommand(final boolean shouldRun,
+      final DiskBalancerConfiguration diskBalancerConfiguration) {
+    this.shouldRun = shouldRun;
+    this.diskBalancerConfiguration = diskBalancerConfiguration;
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.diskBalancerCommand;
+  }
+
+  @Override
+  public DiskBalancerCommandProto getProto() {
+    return DiskBalancerCommandProto.newBuilder()
+        .setShouldRun(shouldRun)
+        .setDiskBalancerConf(diskBalancerConfiguration.toProtobufBuilder())
+        .build();
+  }
+
+  public static DiskBalancerCommand getFromProtobuf(DiskBalancerCommandProto
+      diskbalancerCommandProto, ConfigurationSource configuration) {
+    Preconditions.checkNotNull(diskbalancerCommandProto);
+    return new DiskBalancerCommand(diskbalancerCommandProto.getShouldRun(),
+        DiskBalancerConfiguration.fromProtobuf(
+            diskbalancerCommandProto.getDiskBalancerConf(), configuration));
+  }
+
+  public boolean isShouldRun() {
+    return shouldRun;
+  }
+
+  public DiskBalancerConfiguration getDiskBalancerConfiguration() {
+    return diskBalancerConfiguration;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index 60404fa36b..2f38e87fed 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.hdds.HddsIdFactory;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.datanode.metadata.DatanodeCRLStore;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
 import org.apache.hadoop.hdds.protocol.proto.
@@ -39,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto.
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.junit.jupiter.api.BeforeAll;
@@ -217,6 +220,45 @@ public class TestReportPublisher {
     executorService.shutdown();
   }
 
+  @Test
+  public void testDiskBalancerReportPublisher() throws IOException {
+    StateContext dummyContext = mock(StateContext.class);
+    DatanodeStateMachine dummyStateMachine =
+        mock(DatanodeStateMachine.class);
+    OzoneContainer dummyContainer = mock(OzoneContainer.class);
+
+    DiskBalancerReportProto.Builder builder =
+        DiskBalancerReportProto.newBuilder();
+    builder.setIsRunning(true);
+    builder.setBalancedBytes(1L);
+    builder.setDiskBalancerConf(
+        HddsProtos.DiskBalancerConfigurationProto.newBuilder().build());
+    DiskBalancerReportProto dummyReport = builder.build();
+
+    ReportPublisher publisher = new DiskBalancerReportPublisher();
+    when(dummyContext.getParent()).thenReturn(dummyStateMachine);
+    when(dummyStateMachine.getContainer()).thenReturn(dummyContainer);
+    when(dummyContainer.getDiskBalancerReport()).thenReturn(dummyReport);
+    publisher.setConf(config);
+
+    ScheduledExecutorService executorService = HadoopExecutors
+        .newScheduledThreadPool(1,
+            new ThreadFactoryBuilder().setDaemon(true)
+                .setNameFormat("Unit test ReportManager Thread - %d").build());
+    publisher.init(dummyContext, executorService);
+    Message report =
+        ((DiskBalancerReportPublisher) publisher).getReport();
+    assertNotNull(report);
+    for (Descriptors.FieldDescriptor descriptor :
+        report.getDescriptorForType().getFields()) {
+      if (descriptor.getNumber() ==
+          DiskBalancerReportProto.ISRUNNING_FIELD_NUMBER) {
+        assertEquals(true, report.getField(descriptor));
+      }
+    }
+    executorService.shutdown();
+  }
+
   /**
    * Get a datanode details.
    *
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
index 44e1389bf7..1ebb05218d 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 
 import org.junit.jupiter.api.Test;
 
@@ -66,6 +67,17 @@ public class TestReportPublisherFactory {
     assertEquals(conf, publisher.getConf());
   }
 
+  @Test
+  public void testGetDiskBalancerReportPublisher() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    ReportPublisherFactory factory = new ReportPublisherFactory(conf);
+    ReportPublisher publisher = factory
+        .getPublisherFor(DiskBalancerReportProto.class);
+    assertEquals(DiskBalancerReportPublisher.class,
+        publisher.getClass());
+    assertEquals(conf, publisher.getConf());
+  }
+
   @Test
   public void testInvalidReportPublisher() {
     OzoneConfiguration conf = new OzoneConfiguration();
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index c1a48cf18f..91e6f5578d 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -50,6 +50,9 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoType;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpType;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesResponseProto;
@@ -1135,11 +1138,14 @@ public final class 
StorageContainerLocationProtocolClientSideTranslatorPB
 
   @Override
   public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
-      Optional<List<String>> hosts, int clientVersion) throws IOException {
+      Optional<List<String>> hosts,
+      Optional<HddsProtos.DiskBalancerRunningStatus> status,
+      int clientVersion) throws IOException {
     DatanodeDiskBalancerInfoRequestProto.Builder requestBuilder =
         DatanodeDiskBalancerInfoRequestProto.newBuilder()
             .setInfoType(DatanodeDiskBalancerInfoType.status);
     hosts.ifPresent(requestBuilder::addAllHosts);
+    status.ifPresent(requestBuilder::setStatus);
     DatanodeDiskBalancerInfoRequestProto request = requestBuilder.build();
 
     DatanodeDiskBalancerInfoResponseProto response =
@@ -1151,22 +1157,83 @@ public final class 
StorageContainerLocationProtocolClientSideTranslatorPB
   }
 
   @Override
-  public void startDiskBalancer(Optional<Double> threshold,
-      Optional<Double> bandwidth, Optional<List<String>> hosts)
-      throws IOException {
+  public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
+      Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
+      Optional<List<String>> hosts) throws IOException {
+    HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder =
+        HddsProtos.DiskBalancerConfigurationProto.newBuilder();
+    threshold.ifPresent(confBuilder::setThreshold);
+    bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB);
+    parallelThread.ifPresent(confBuilder::setParallelThread);
+
+    DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
+        DatanodeDiskBalancerOpRequestProto.newBuilder()
+            .setOpType(DatanodeDiskBalancerOpType.start)
+            .setConf(confBuilder);
+    hosts.ifPresent(requestBuilder::addAllHosts);
 
+    DatanodeDiskBalancerOpResponseProto response =
+        submitRequest(Type.DatanodeDiskBalancerOp,
+            builder -> builder.setDatanodeDiskBalancerOpRequest(
+                requestBuilder.build()))
+            .getDatanodeDiskBalancerOpResponse();
+
+    List<DatanodeAdminError> errors = new ArrayList<>();
+    for (DatanodeAdminErrorResponseProto e : response.getFailedHostsList()) {
+      errors.add(new DatanodeAdminError(e.getHost(), e.getError()));
+    }
+    return errors;
   }
 
   @Override
-  public void stopDiskBalancer(Optional<List<String>> hosts)
+  public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> 
hosts)
       throws IOException {
+    DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
+        DatanodeDiskBalancerOpRequestProto.newBuilder()
+            .setOpType(DatanodeDiskBalancerOpType.stop);
+    hosts.ifPresent(requestBuilder::addAllHosts);
+
+    DatanodeDiskBalancerOpResponseProto response =
+        submitRequest(Type.DatanodeDiskBalancerOp,
+            builder -> builder.setDatanodeDiskBalancerOpRequest(
+                requestBuilder.build()))
+            .getDatanodeDiskBalancerOpResponse();
 
+    List<DatanodeAdminError> errors = new ArrayList<>();
+    for (DatanodeAdminErrorResponseProto e : response.getFailedHostsList()) {
+      errors.add(new DatanodeAdminError(e.getHost(), e.getError()));
+    }
+    return errors;
   }
 
   @Override
-  public void updateDiskBalancerConfiguration(Optional<Double> threshold,
-      Optional<Double> bandwidth, Optional<List<String>> hosts)
+  public List<DatanodeAdminError> updateDiskBalancerConfiguration(
+      Optional<Double> threshold, Optional<Long> bandwidthInMB,
+      Optional<Integer> parallelThread, Optional<List<String>> hosts)
       throws IOException {
+    HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder =
+        HddsProtos.DiskBalancerConfigurationProto.newBuilder();
+    threshold.ifPresent(confBuilder::setThreshold);
+    bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB);
+    parallelThread.ifPresent(confBuilder::setParallelThread);
+
+    DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
+        DatanodeDiskBalancerOpRequestProto.newBuilder()
+            .setOpType(DatanodeDiskBalancerOpType.update)
+            .setConf(confBuilder);
+    hosts.ifPresent(requestBuilder::addAllHosts);
+
+    DatanodeDiskBalancerOpResponseProto response =
+        submitRequest(Type.DatanodeDiskBalancerOp,
+            builder -> builder.setDatanodeDiskBalancerOpRequest(
+                requestBuilder.build()))
+            .getDatanodeDiskBalancerOpResponse();
+
+    List<DatanodeAdminError> errors = new ArrayList<>();
+    for (DatanodeAdminErrorResponseProto e : response.getFailedHostsList()) {
+      errors.add(new DatanodeAdminError(e.getHost(), e.getError()));
+    }
+    return errors;
   }
 
   @Override
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto 
b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index 4730a4ccc7..fca2232921 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -85,6 +85,7 @@ message ScmContainerLocationRequest {
   optional GetContainersOnDecomNodeRequestProto 
getContainersOnDecomNodeRequest = 46;
   optional GetMetricsRequestProto getMetricsRequest = 47;
   optional DatanodeDiskBalancerInfoRequestProto 
DatanodeDiskBalancerInfoRequest = 48;
+  optional DatanodeDiskBalancerOpRequestProto datanodeDiskBalancerOpRequest = 
49;
 }
 
 message ScmContainerLocationResponse {
@@ -141,6 +142,7 @@ message ScmContainerLocationResponse {
   optional GetContainersOnDecomNodeResponseProto 
getContainersOnDecomNodeResponse = 46;
   optional GetMetricsResponseProto getMetricsResponse = 47;
   optional DatanodeDiskBalancerInfoResponseProto 
DatanodeDiskBalancerInfoResponse = 48;
+  optional DatanodeDiskBalancerOpResponseProto datanodeDiskBalancerOpResponse 
= 49;
 
   enum Status {
     OK = 1;
@@ -196,6 +198,7 @@ enum Type {
   GetContainersOnDecomNode = 42;
   GetMetrics = 43;
   DatanodeDiskBalancerInfo = 44;
+  DatanodeDiskBalancerOp = 45;
 }
 
 /**
@@ -372,12 +375,29 @@ message DatanodeDiskBalancerInfoRequestProto {
   required DatanodeDiskBalancerInfoType infoType = 1;
   optional uint32 count = 2;
   repeated string hosts = 3;
+  optional DiskBalancerRunningStatus status = 4;
 }
 
 message DatanodeDiskBalancerInfoResponseProto {
   repeated DatanodeDiskBalancerInfoProto info = 1;
 }
 
+enum DatanodeDiskBalancerOpType{
+  start = 1;
+  stop = 2;
+  update = 3;
+}
+
+message DatanodeDiskBalancerOpRequestProto {
+  required DatanodeDiskBalancerOpType opType = 1;
+  repeated string hosts = 2;
+  optional DiskBalancerConfigurationProto conf = 3;
+}
+
+message DatanodeDiskBalancerOpResponseProto {
+  repeated DatanodeAdminErrorResponseProto failedHosts = 1;
+}
+
 /*
   Decommission a list of hosts
 */
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto 
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 803842ed2d..a53d035109 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -524,13 +524,19 @@ message InnerNode {
 
 message DiskBalancerConfigurationProto {
     optional double threshold = 1;
-    optional double diskBandwidth = 2;
+    optional uint64 diskBandwidthInMB = 2;
     optional int32 parallelThread = 3;
 }
 
+enum DiskBalancerRunningStatus {
+    RUNNING = 1;
+    STOPPED = 2;
+    UNKNOWN = 3;
+}
+
 message DatanodeDiskBalancerInfoProto {
     required DatanodeDetailsProto node = 1;
     required double currentVolumeDensitySum = 2;
-    optional bool diskBalancerRunning = 3;
+    optional DiskBalancerRunningStatus runningStatus = 3;
     optional DiskBalancerConfigurationProto diskBalancerConf = 4;
 }
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 2994073c02..7a265dd0ba 100644
--- 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -139,6 +139,7 @@ message SCMHeartbeatRequestProto {
   optional PipelineReportsProto pipelineReports = 8;
   optional LayoutVersionProto dataNodeLayoutVersion = 9;
   optional CommandQueueReportProto commandQueueReport = 10;
+  optional DiskBalancerReportProto diskBalancerReport = 11;
 }
 
 message CommandQueueReportProto {
@@ -328,6 +329,7 @@ message SCMCommandProto {
     finalizeNewLayoutVersionCommand = 9;
     refreshVolumeUsageInfo = 10;
     reconstructECContainersCommand = 11;
+    diskBalancerCommand = 12;
   }
   // TODO: once we start using protoc 3.x, refactor this message using "oneof"
   required Type commandType = 1;
@@ -343,6 +345,7 @@ message SCMCommandProto {
   finalizeNewLayoutVersionCommandProto = 10;
   optional RefreshVolumeUsageCommandProto refreshVolumeUsageCommandProto = 11;
   optional ReconstructECContainersCommandProto 
reconstructECContainersCommandProto = 12;
+  optional DiskBalancerCommandProto diskBalancerCommandProto = 13;
 
 
   // If running upon Ratis, holds term of underlying RaftServer iff current
@@ -445,6 +448,14 @@ message DatanodeDetailsAndReplicaIndexProto {
     required int32 replicaIndex = 2;
 }
 
+/**
+This command asks the datanode to update diskBalancer status
+ */
+message DiskBalancerCommandProto {
+  required bool shouldRun = 1;
+  optional DiskBalancerConfigurationProto diskBalancerConf = 2;
+}
+
 /**
 This command asks the datanode to create a pipeline.
 */
@@ -483,6 +494,12 @@ message CRLStatusReport {
   repeated int64 pendingCrlIds=2;
 }
 
+message DiskBalancerReportProto {
+  required bool isRunning = 1;
+  optional uint64 balancedBytes = 2;
+  optional DiskBalancerConfigurationProto diskBalancerConf = 3;
+}
+
 /**
  * This command asks the datanode to process a new CRL.
  */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 0cc205b2ff..d22a878249 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -27,6 +27,7 @@ import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.CRLStatu
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.CommandStatusReportFromDatanode;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerActionsFromDatanode;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.DiskBalancerReportFromDatanode;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
@@ -217,6 +218,14 @@ public final class SCMEvents {
       new TypedEvent<>(CRLStatusReportFromDatanode.class,
           "Crl_Status_Report");
 
+  /**
+   * DiskBalancer reports are send out by Datanodes. This report is received by
+   * SCMDatanodeHeartbeatDispatcher and DiskBalancer_Report Event is generated.
+   */
+  public static final TypedEvent<DiskBalancerReportFromDatanode>
+      DISK_BALANCER_REPORT = new TypedEvent<>(
+          DiskBalancerReportFromDatanode.class, "DiskBalancer_Report");
+
   /**
    * Private Ctor. Never Constructed.
    */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
index 0b16c1aa95..c6114c52f5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
@@ -22,7 +22,8 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -47,10 +48,13 @@ public class DiskBalancerManager {
   public static final Logger LOG =
       LoggerFactory.getLogger(DiskBalancerManager.class);
 
+
+  private OzoneConfiguration conf;
   private final EventPublisher scmNodeEventPublisher;
   private final SCMContext scmContext;
   private final NodeManager nodeManager;
   private Map<DatanodeDetails, DiskBalancerStatus> statusMap;
+  private Map<DatanodeDetails, Long> balancedBytesMap;
   private boolean useHostnames;
 
   /**
@@ -60,6 +64,7 @@ public class DiskBalancerManager {
                         EventPublisher eventPublisher,
                         SCMContext scmContext,
                         NodeManager nodeManager) {
+    this.conf = conf;
     this.scmNodeEventPublisher = eventPublisher;
     this.scmContext = scmContext;
     this.nodeManager = nodeManager;
@@ -95,42 +100,60 @@ public class DiskBalancerManager {
    * If hosts is null, return status of all datanodes in balancing.
    */
   public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
-      Optional<List<String>> hosts, int clientVersion) throws IOException {
-    List<HddsProtos.DatanodeDiskBalancerInfoProto> statusList =
-        new ArrayList<>();
+      Optional<List<String>> hosts,
+      Optional<HddsProtos.DiskBalancerRunningStatus> status,
+      int clientVersion) throws IOException {
     List<DatanodeDetails> filterDns = null;
     if (hosts.isPresent() && !hosts.get().isEmpty()) {
       filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
           useHostnames);
     }
 
-    for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,
-        HddsProtos.NodeState.HEALTHY)) {
-      if (shouldReturnDatanode(filterDns, datanodeDetails)) {
-        double volumeDensitySum =
-            getVolumeDataDensitySumForDatanodeDetails(datanodeDetails);
-        statusList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
-            .setCurrentVolumeDensitySum(volumeDensitySum)
-            .setDiskBalancerRunning(isRunning(datanodeDetails))
-            .setDiskBalancerConf(statusMap.getOrDefault(datanodeDetails,
-                    DiskBalancerStatus.DUMMY_STATUS)
-                .getDiskBalancerConfiguration().toProtobufBuilder())
-            .setNode(datanodeDetails.toProto(clientVersion))
-            .build());
-      }
+    // Filter Running Status by default
+    HddsProtos.DiskBalancerRunningStatus filterStatus = status.orElse(null);
+
+    if (filterDns != null) {
+      return filterDns.stream()
+          .filter(dn -> shouldReturnDatanode(filterStatus, dn))
+          .map(nodeManager::getDatanodeInfo)
+          .map(dn -> getInfoProto(dn, clientVersion))
+          .collect(Collectors.toList());
+    } else {
+      return nodeManager.getAllNodes().stream()
+          .filter(dn -> shouldReturnDatanode(filterStatus, dn))
+          .map(dn -> getInfoProto((DatanodeInfo)dn, clientVersion))
+          .collect(Collectors.toList());
     }
-    return statusList;
   }
 
-  private boolean shouldReturnDatanode(List<DatanodeDetails> hosts,
+  private boolean shouldReturnDatanode(
+      HddsProtos.DiskBalancerRunningStatus status,
       DatanodeDetails datanodeDetails) {
-    if (hosts == null || hosts.isEmpty()) {
-      return isRunning(datanodeDetails);
-    } else {
-      return hosts.contains(datanodeDetails);
+    boolean shouldReturn = true;
+    // If status specified, do not return if status not match.
+    if (status != null && getRunningStatus(datanodeDetails) != status) {
+      shouldReturn = false;
     }
+    return shouldReturn;
   }
 
+  private HddsProtos.DatanodeDiskBalancerInfoProto getInfoProto(
+      DatanodeInfo dn, int clientVersion) {
+    double volumeDensitySum =
+        getVolumeDataDensitySumForDatanodeDetails(dn);
+    HddsProtos.DiskBalancerRunningStatus runningStatus =
+        getRunningStatus(dn);
+    HddsProtos.DatanodeDiskBalancerInfoProto.Builder builder =
+        HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+            .setNode(dn.toProto(clientVersion))
+            .setCurrentVolumeDensitySum(volumeDensitySum)
+            .setRunningStatus(getRunningStatus(dn));
+    if (runningStatus != HddsProtos.DiskBalancerRunningStatus.UNKNOWN) {
+      builder.setDiskBalancerConf(statusMap.get(dn)
+          .getDiskBalancerConfiguration().toProtobufBuilder());
+    }
+    return builder.build();
+  }
   /**
    * Get volume density for a specific DatanodeDetails node.
    *
@@ -144,8 +167,7 @@ public class DiskBalancerManager {
     DatanodeInfo datanodeInfo = (DatanodeInfo) datanodeDetails;
 
     double totalCapacity = 0d, totalUsed = 0d;
-    for (StorageContainerDatanodeProtocolProtos.StorageReportProto reportProto 
:
-        datanodeInfo.getStorageReports()) {
+    for (StorageReportProto reportProto : datanodeInfo.getStorageReports()) {
       totalCapacity += reportProto.getCapacity();
       totalUsed += reportProto.getScmUsed();
     }
@@ -162,10 +184,17 @@ public class DiskBalancerManager {
     return volumeDensitySum;
   }
 
-  private boolean isRunning(DatanodeDetails datanodeDetails) {
-    return statusMap
-        .getOrDefault(datanodeDetails, DiskBalancerStatus.DUMMY_STATUS)
-        .isRunning();
+  private HddsProtos.DiskBalancerRunningStatus getRunningStatus(
+      DatanodeDetails datanodeDetails) {
+    if (!statusMap.containsKey(datanodeDetails)) {
+      return HddsProtos.DiskBalancerRunningStatus.UNKNOWN;
+    } else {
+      if (statusMap.get(datanodeDetails).isRunning()) {
+        return HddsProtos.DiskBalancerRunningStatus.RUNNING;
+      } else {
+        return HddsProtos.DiskBalancerRunningStatus.STOPPED;
+      }
+    }
   }
 
   @VisibleForTesting
@@ -173,4 +202,24 @@ public class DiskBalancerManager {
     statusMap.put(datanodeDetails, new DiskBalancerStatus(true,
         new DiskBalancerConfiguration()));
   }
+
+  public void processDiskBalancerReport(DiskBalancerReportProto reportProto,
+      DatanodeDetails dn) {
+    boolean isRunning = reportProto.getIsRunning();
+    DiskBalancerConfiguration diskBalancerConfiguration =
+        reportProto.hasDiskBalancerConf() ?
+            DiskBalancerConfiguration.fromProtobuf(
+                reportProto.getDiskBalancerConf(), conf) :
+            new DiskBalancerConfiguration();
+    statusMap.put(dn, new DiskBalancerStatus(isRunning,
+        diskBalancerConfiguration));
+    if (reportProto.hasBalancedBytes()) {
+      balancedBytesMap.put(dn, reportProto.getBalancedBytes());
+    }
+  }
+
+  @VisibleForTesting
+  public Map<DatanodeDetails, DiskBalancerStatus> getStatusMap() {
+    return statusMap;
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerReportHandler.java
new file mode 100644
index 0000000000..47158972ec
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerReportHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.DiskBalancerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles DiskBalancer Reports from datanode.
+ */
+public class DiskBalancerReportHandler implements
+    EventHandler<DiskBalancerReportFromDatanode> {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(DiskBalancerReportHandler.class);
+
+  private DiskBalancerManager diskBalancerManager;
+
+  public DiskBalancerReportHandler(DiskBalancerManager diskBalancerManager) {
+    this.diskBalancerManager = diskBalancerManager;
+  }
+
+  @Override
+  public void onMessage(DiskBalancerReportFromDatanode reportFromDatanode,
+      EventPublisher publisher) {
+    Preconditions.checkNotNull(reportFromDatanode);
+    DatanodeDetails dn = reportFromDatanode.getDatanodeDetails();
+    DiskBalancerReportProto diskBalancerReportProto =
+        reportFromDatanode.getReport();
+    Preconditions.checkNotNull(dn,
+        "DiskBalancer Report is missing DatanodeDetails.");
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("Processing diskBalancer report for dn: {}", dn);
+    }
+    try {
+      diskBalancerManager.processDiskBalancerReport(
+          diskBalancerReportProto, dn);
+    } catch (Exception e) {
+      LOGGER.error("Failed to process diskBalancer report={} from dn={}.",
+          diskBalancerReportProto, dn, e);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
index a064a6ff9f..8d2cb26c36 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
@@ -32,8 +32,6 @@ public class DiskBalancerStatus {
   private boolean isRunning;
   private DiskBalancerConfiguration diskBalancerConfiguration;
 
-  public static final DiskBalancerStatus DUMMY_STATUS =
-      new DiskBalancerStatus(false, new DiskBalancerConfiguration());
 
   public DiskBalancerStatus(boolean isRunning, DiskBalancerConfiguration conf) 
{
     this.isRunning = isRunning;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 21bcd1f78a..6063e6dd88 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -175,6 +175,14 @@ public interface NodeManager extends 
StorageContainerNodeProtocol,
    */
   DatanodeUsageInfo getUsageInfo(DatanodeDetails dn);
 
+  /**
+   * Get the datanode info of a specified datanode.
+   *
+   * @param dn the usage of which we want to get
+   * @return DatanodeInfo of the specified datanode
+   */
+  DatanodeInfo getDatanodeInfo(DatanodeDetails dn);
+
   /**
    * Return the node stat of the specified datanode.
    * @param datanodeDetails DatanodeDetails.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index a149998db8..e3a7046afa 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -983,6 +983,23 @@ public class SCMNodeManager implements NodeManager {
     return usageInfo;
   }
 
+  /**
+   * Get the usage info of a specified datanode.
+   *
+   * @param dn the usage of which we want to get
+   * @return DatanodeUsageInfo of the specified datanode
+   */
+  @Override
+  public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) {
+    try {
+      return nodeStateManager.getNode(dn);
+    } catch (NodeNotFoundException e) {
+      LOG.warn("Cannot retrieve DatanodeInfo, datanode {} not found.",
+          dn.getUuid());
+      return null;
+    }
+  }
+
   /**
    * Return the node stat of the specified datanode.
    *
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 71a8edad41..60b17d1fcb 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -41,6 +41,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeAdminErrorResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto;
@@ -731,6 +732,13 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
               request.getDatanodeDiskBalancerInfoRequest(),
               request.getVersion()))
           .build();
+      case DatanodeDiskBalancerOp:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setDatanodeDiskBalancerOpResponse(getDatanodeDiskBalancerOp(
+                request.getDatanodeDiskBalancerOpRequest()))
+            .build();
       default:
         throw new IllegalArgumentException(
             "Unknown command type: " + request.getCmdType());
@@ -1317,7 +1325,9 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
       break;
     case status:
       infoProtoList = impl.getDiskBalancerStatus(
-          Optional.of(request.getHostsList()), clientVersion);
+          Optional.of(request.getHostsList()),
+          Optional.of(request.getStatus()),
+          clientVersion);
       break;
     default:
       infoProtoList = null;
@@ -1330,4 +1340,43 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
   public GetMetricsResponseProto getMetrics(GetMetricsRequestProto request) 
throws IOException {
     return 
GetMetricsResponseProto.newBuilder().setMetricsJson(impl.getMetrics(request.getQuery())).build();
   }
+
+  public DatanodeDiskBalancerOpResponseProto getDatanodeDiskBalancerOp(
+          StorageContainerLocationProtocolProtos.
+                  DatanodeDiskBalancerOpRequestProto request)
+          throws IOException {
+    List<DatanodeAdminError> errors;
+    switch (request.getOpType()) {
+    case start:
+      errors = impl.startDiskBalancer(
+              Optional.of(request.getConf().getThreshold()),
+              Optional.of(request.getConf().getDiskBandwidthInMB()),
+              Optional.of(request.getConf().getParallelThread()),
+              Optional.of(request.getHostsList()));
+      break;
+    case update:
+      errors = impl.updateDiskBalancerConfiguration(
+              Optional.of(request.getConf().getThreshold()),
+              Optional.of(request.getConf().getDiskBandwidthInMB()),
+              Optional.of(request.getConf().getParallelThread()),
+              Optional.of(request.getHostsList()));
+      break;
+    case stop:
+      errors = impl.stopDiskBalancer(Optional.of(request.getHostsList()));
+      break;
+    default:
+      errors = new ArrayList<>();
+    }
+
+    DatanodeDiskBalancerOpResponseProto.Builder response =
+            DatanodeDiskBalancerOpResponseProto.newBuilder();
+    for (DatanodeAdminError e : errors) {
+      DatanodeAdminErrorResponseProto.Builder error =
+              DatanodeAdminErrorResponseProto.newBuilder();
+      error.setHost(e.getHostname());
+      error.setError(e.getError());
+      response.addFailedHosts(error);
+    }
+    return response.build();
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 6da9907db3..91883985d7 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -1290,7 +1290,9 @@ public class SCMClientProtocolServer implements
 
   @Override
   public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
-      Optional<List<String>> hosts, int clientVersion) throws IOException {
+      Optional<List<String>> hosts,
+      Optional<HddsProtos.DiskBalancerRunningStatus> status,
+      int clientVersion) throws IOException {
     // check admin authorisation
     try {
       getScm().checkAdminAccess(getRemoteUser(), true);
@@ -1299,29 +1301,33 @@ public class SCMClientProtocolServer implements
       throw e;
     }
 
-    return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts,
+    return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts, status,
         clientVersion);
   }
 
   @Override
-  public void startDiskBalancer(Optional<Double> threshold,
-      Optional<Double> bandwidth, Optional<List<String>> hosts)
-      throws IOException {
+  public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
+      Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
+      Optional<List<String>> hosts) throws IOException {
     // TODO: Send message to datanodes
+    return null;
   }
 
   @Override
-  public void stopDiskBalancer(Optional<List<String>> hosts)
+  public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> 
hosts)
       throws IOException {
     // TODO: Send message to datanodes
+    return null;
   }
 
 
   @Override
-  public void updateDiskBalancerConfiguration(Optional<Double> threshold,
-      Optional<Double> bandwidth, Optional<List<String>> hosts)
+  public List<DatanodeAdminError> updateDiskBalancerConfiguration(
+      Optional<Double> threshold, Optional<Long> bandwidthInMB,
+      Optional<Integer> parallelThread, Optional<List<String>> hosts)
       throws IOException {
     // TODO: Send message to datanodes
+    return null;
   }
 
   /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 484a1e6f0f..59ff40ebf9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandQueueReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.CRLStatusReport;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
@@ -57,6 +58,7 @@ import java.util.UUID;
 
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.DISK_BALANCER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents
     .INCREMENTAL_CONTAINER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
@@ -201,6 +203,15 @@ public final class SCMDatanodeHeartbeatDispatcher {
                   commandStatusReport));
         }
       }
+
+      if (heartbeat.hasDiskBalancerReport()) {
+        LOG.debug("Dispatching DiskBalancer Report.");
+        eventPublisher.fireEvent(
+            DISK_BALANCER_REPORT,
+            new DiskBalancerReportFromDatanode(
+                datanodeDetails,
+                heartbeat.getDiskBalancerReport()));
+      }
     }
 
     return commands;
@@ -453,4 +464,16 @@ public final class SCMDatanodeHeartbeatDispatcher {
       super(datanodeDetails, report);
     }
   }
+
+  /**
+   * DiskBalancer report event payload with origin.
+   */
+  public static class DiskBalancerReportFromDatanode
+      extends ReportFromDatanode<DiskBalancerReportProto> {
+
+    public DiskBalancerReportFromDatanode(DatanodeDetails datanodeDetails,
+        DiskBalancerReportProto report) {
+      super(datanodeDetails, report);
+    }
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 7b14cf2e1b..c1362dd298 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
 import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.node.DiskBalancerManager;
+import org.apache.hadoop.hdds.scm.node.DiskBalancerReportHandler;
 import org.apache.hadoop.hdds.scm.node.NodeAddressUpdateHandler;
 import org.apache.hadoop.hdds.scm.security.SecretKeyManagerService;
 import org.apache.hadoop.hdds.scm.security.RootCARotationManager;
@@ -507,6 +508,8 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
         new PipelineActionHandler(pipelineManager, scmContext, configuration);
     CRLStatusReportHandler crlStatusReportHandler =
         new CRLStatusReportHandler(certificateStore, configuration);
+    DiskBalancerReportHandler diskBalancerReportHandler =
+        new DiskBalancerReportHandler(diskBalancerManager);
 
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, 
scmNodeManager);
@@ -584,6 +587,8 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     scmNodeManager.registerSendCommandNotify(
         SCMCommandProto.Type.deleteBlocksCommand,
         scmBlockManager.getDeletedBlockLog()::onSent);
+    eventQueue.addHandler(SCMEvents.DISK_BALANCER_REPORT,
+        diskBalancerReportHandler);
   }
 
   private void initializeCertificateClient() throws IOException {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 21c3f1c9a8..aeda597ab6 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -349,7 +349,36 @@ public class MockNodeManager implements NodeManager {
    */
   @Override
   public List<DatanodeDetails> getAllNodes() {
-    return new ArrayList<>(nodeMetricMap.keySet());
+    // mock storage reports for TestDiskBalancer
+    List<DatanodeDetails> healthyNodesWithInfo = new ArrayList<>();
+    for (Map.Entry<DatanodeDetails, SCMNodeStat> entry:
+        nodeMetricMap.entrySet()) {
+      NodeStatus nodeStatus = NodeStatus.inServiceHealthy();
+      if (staleNodes.contains(entry.getKey())) {
+        nodeStatus = NodeStatus.inServiceStale();
+      } else if (deadNodes.contains(entry.getKey())) {
+        nodeStatus = NodeStatus.inServiceDead();
+      }
+      DatanodeInfo di = new DatanodeInfo(entry.getKey(), nodeStatus,
+          UpgradeUtils.defaultLayoutVersionProto());
+
+      long capacity = entry.getValue().getCapacity().get();
+      long used = entry.getValue().getScmUsed().get();
+      long remaining = entry.getValue().getRemaining().get();
+      StorageReportProto storage1 = HddsTestUtils.createStorageReport(
+          di.getUuid(), "/data1-" + di.getUuidString(),
+          capacity, used, remaining, null);
+      MetadataStorageReportProto metaStorage1 =
+          HddsTestUtils.createMetadataStorageReport(
+              "/metadata1-" + di.getUuidString(), capacity, used,
+              remaining, null);
+      di.updateStorageReports(new ArrayList<>(Arrays.asList(storage1)));
+      di.updateMetaDataStorageReports(
+          new ArrayList<>(Arrays.asList(metaStorage1)));
+
+      healthyNodesWithInfo.add(di);
+    }
+    return healthyNodesWithInfo;
   }
 
   /**
@@ -412,6 +441,26 @@ public class MockNodeManager implements NodeManager {
     return new DatanodeUsageInfo(datanodeDetails, stat);
   }
 
+  @Override
+  public DatanodeInfo getDatanodeInfo(DatanodeDetails dd) {
+    DatanodeInfo di = new DatanodeInfo(dd, NodeStatus.inServiceHealthy(),
+        UpgradeUtils.defaultLayoutVersionProto());
+    long capacity = nodeMetricMap.get(dd).getCapacity().get();
+    long used = nodeMetricMap.get(dd).getScmUsed().get();
+    long remaining = nodeMetricMap.get(dd).getRemaining().get();
+    StorageReportProto storage1 = HddsTestUtils.createStorageReport(
+        di.getUuid(), "/data1-" + di.getUuidString(),
+        capacity, used, remaining, null);
+    MetadataStorageReportProto metaStorage1 =
+        HddsTestUtils.createMetadataStorageReport(
+            "/metadata1-" + di.getUuidString(), capacity, used,
+            remaining, null);
+    di.updateStorageReports(new ArrayList<>(Arrays.asList(storage1)));
+    di.updateMetaDataStorageReports(
+        new ArrayList<>(Arrays.asList(metaStorage1)));
+    return di;
+  }
+
   /**
    * Return the node stat of the specified datanode.
    * @param datanodeDetails - datanode details.
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
index 9649159de3..0838e56ba8 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
@@ -245,6 +245,11 @@ public class SimpleMockNodeManager implements NodeManager {
     return null;
   }
 
+  @Override
+  public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) {
+    return null;
+  }
+
   @Override
   public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
     return null;
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
index 541c9764b7..7005dea292 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
@@ -21,8 +21,10 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.DiskBalancerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.ozone.test.GenericTestUtils;
@@ -33,6 +35,7 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
+import java.util.Random;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -46,6 +49,8 @@ public class TestDiskBalancerManager {
   private NodeManager nodeManager;
   private OzoneConfiguration conf;
   private String storageDir;
+  private DiskBalancerReportHandler diskBalancerReportHandler;
+  private Random random;
 
   @BeforeEach
   public void setup() throws Exception {
@@ -56,6 +61,9 @@ public class TestDiskBalancerManager {
     nodeManager = new MockNodeManager(true, 3);
     diskBalancerManager = new DiskBalancerManager(conf, new EventQueue(),
         SCMContext.emptyContext(), nodeManager);
+    diskBalancerReportHandler =
+        new DiskBalancerReportHandler(diskBalancerManager);
+    random = new Random();
   }
 
   @Test
@@ -73,6 +81,7 @@ public class TestDiskBalancerManager {
   @Test
   public void testDatanodeDiskBalancerStatus() throws IOException {
     diskBalancerManager.addRunningDatanode(nodeManager.getAllNodes().get(0));
+    diskBalancerManager.addRunningDatanode(nodeManager.getAllNodes().get(1));
 
     // Simulate users asking all status of 3 datanodes
     List<String> dns = nodeManager.getAllNodes().stream().map(
@@ -81,6 +90,7 @@ public class TestDiskBalancerManager {
 
     List<HddsProtos.DatanodeDiskBalancerInfoProto> statusProtoList =
         diskBalancerManager.getDiskBalancerStatus(Optional.of(dns),
+            Optional.empty(),
             ClientVersion.CURRENT_VERSION);
 
     Assertions.assertEquals(3, statusProtoList.size());
@@ -92,8 +102,32 @@ public class TestDiskBalancerManager {
 
     statusProtoList =
         diskBalancerManager.getDiskBalancerStatus(Optional.of(dns),
+            Optional.empty(),
             ClientVersion.CURRENT_VERSION);
 
     Assertions.assertEquals(1, statusProtoList.size());
   }
+
+  @Test
+  public void testHandleDiskBalancerReportFromDatanode() {
+    for (DatanodeDetails dn: nodeManager.getAllNodes()) {
+      diskBalancerReportHandler.onMessage(
+          new DiskBalancerReportFromDatanode(dn, generateRandomReport()), 
null);
+    }
+
+    Assertions.assertEquals(3, diskBalancerManager.getStatusMap().size());
+  }
+
+  private DiskBalancerReportProto generateRandomReport() {
+    return DiskBalancerReportProto.newBuilder()
+        .setIsRunning(random.nextBoolean())
+        .setBalancedBytes(random.nextInt(10000))
+        .setDiskBalancerConf(
+            HddsProtos.DiskBalancerConfigurationProto.newBuilder()
+                .setThreshold(random.nextInt(99))
+                .setParallelThread(random.nextInt(4) + 1)
+                .setDiskBandwidthInMB(random.nextInt(99) + 1)
+                .build())
+        .build();
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 92a6fd455d..4b83abff83 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -24,6 +24,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
 import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -195,6 +196,11 @@ public class ReplicationNodeManagerMock implements 
NodeManager {
     return null;
   }
 
+  @Override
+  public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) {
+    return null;
+  }
+
   /**
    * Return the node stat of the specified datanode.
    *
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index cb3fdb42f8..641060a1e5 100644
--- 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -575,31 +575,34 @@ public class ContainerOperationClient implements 
ScmClient {
   }
 
   @Override
-  public void startDiskBalancer(Optional<Double> threshold,
-      Optional<Double> bandwidth, Optional<List<String>> hosts)
-      throws IOException {
-    storageContainerLocationClient.startDiskBalancer(threshold, bandwidth,
-        hosts);
+  public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
+      Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
+      Optional<List<String>> hosts) throws IOException {
+    return storageContainerLocationClient.startDiskBalancer(threshold,
+        bandwidthInMB, parallelThread, hosts);
   }
 
   @Override
-  public void stopDiskBalancer(Optional<List<String>> hosts)
+  public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> 
hosts)
       throws IOException {
-    storageContainerLocationClient.stopDiskBalancer(hosts);
+    return storageContainerLocationClient.stopDiskBalancer(hosts);
   }
 
   @Override
   public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
-      Optional<List<String>> hosts) throws IOException {
+      Optional<List<String>> hosts,
+      Optional<HddsProtos.DiskBalancerRunningStatus> runningStatus)
+      throws IOException {
     return storageContainerLocationClient.getDiskBalancerStatus(hosts,
-        ClientVersion.CURRENT_VERSION);
+        runningStatus, ClientVersion.CURRENT_VERSION);
   }
 
   @Override
-  public void updateDiskBalancerConfiguration(Optional<Double> threshold,
-      Optional<Double> bandwidth, Optional<List<String>> hosts)
+  public List<DatanodeAdminError> updateDiskBalancerConfiguration(
+      Optional<Double> threshold, Optional<Long> bandwidth,
+      Optional<Integer> parallelThread, Optional<List<String>> hosts)
       throws IOException {
-    storageContainerLocationClient.updateDiskBalancerConfiguration(threshold,
-        bandwidth, hosts);
+    return storageContainerLocationClient.updateDiskBalancerConfiguration(
+        threshold, bandwidth, parallelThread, hosts);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
index c047cd3ee0..7eaca03fed 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
@@ -27,6 +27,7 @@ import 
org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
 import org.apache.hadoop.hdds.scm.client.ScmClient;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.node.DiskBalancerManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -48,6 +49,7 @@ public class TestDiskBalancer {
   private static ScmClient storageClient;
   private static MiniOzoneCluster cluster;
   private static OzoneConfiguration ozoneConf;
+  private static DiskBalancerManager diskBalancerManager;
 
   @BeforeAll
   public static void setup() throws Exception {
@@ -57,6 +59,8 @@ public class TestDiskBalancer {
     cluster = 
MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build();
     storageClient = new ContainerOperationClient(ozoneConf);
     cluster.waitForClusterToBeReady();
+    diskBalancerManager = cluster.getStorageContainerManager()
+        .getDiskBalancerManager();
 
     for (DatanodeDetails dn: cluster.getStorageContainerManager()
         .getScmNodeManager().getAllNodes()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to