This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new dc02775  HDDS-4926. Support start/stop for container balancer via 
command line (#2278)
dc02775 is described below

commit dc027759ad0d22d5d52cc1ee994a9a723fbb64e8
Author: Jackson Yao <[email protected]>
AuthorDate: Wed Jun 23 19:34:39 2021 +0800

    HDDS-4926. Support start/stop for container balancer via command line 
(#2278)
---
 .../apache/hadoop/hdds/scm/client/ScmClient.java   |  21 +++
 .../protocol/StorageContainerLocationProtocol.java |  21 +++
 .../org/apache/hadoop/ozone/audit/SCMAction.java   |   3 +
 ...inerLocationProtocolClientSideTranslatorPB.java |  73 +++++++++++
 .../src/main/proto/ScmAdminProtocol.proto          |  36 ++++++
 .../scm/container/balancer/ContainerBalancer.java  | 101 +++++++++++----
 .../balancer/ContainerBalancerConfiguration.java   |  28 ++++
 ...inerLocationProtocolServerSideTranslatorPB.java |  70 ++++++++++
 .../scm/server/OzoneStorageContainerManager.java   |   3 +
 .../hdds/scm/server/SCMClientProtocolServer.java   |  63 +++++++++
 .../hdds/scm/server/StorageContainerManager.java   |   6 +
 .../container/balancer/TestContainerBalancer.java  |  20 ++-
 .../hdds/scm/cli/ContainerBalancerCommands.java    | 108 ++++++++++++++++
 .../scm/cli/ContainerBalancerStartSubcommand.java  |  66 ++++++++++
 .../scm/cli/ContainerBalancerStatusSubcommand.java |  45 +++++++
 .../scm/cli/ContainerBalancerStopSubcommand.java   |  40 ++++++
 .../hdds/scm/cli/ContainerOperationClient.java     |  20 +++
 .../datanode/TestContainerBalancerSubCommand.java  | 141 +++++++++++++++++++++
 .../ozone/TestContainerBalancerOperations.java     | 112 ++++++++++++++++
 .../scm/ReconStorageContainerManagerFacade.java    |   6 +
 20 files changed, 954 insertions(+), 29 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index e5c5680..f569672 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -31,6 +31,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * The interface to call into underlying container layer.
@@ -306,6 +307,26 @@ public interface ScmClient extends Closeable {
   boolean getReplicationManagerStatus() throws IOException;
 
   /**
+   * Start ContainerBalancer.
+   */
+  boolean startContainerBalancer(Optional<Double> threshold,
+      Optional<Integer> idleiterations,
+      Optional<Integer> maxDatanodesToBalance,
+      Optional<Long> maxSizeToMoveInGB) throws IOException;
+
+  /**
+   * Stop ContainerBalancer.
+   */
+  void stopContainerBalancer() throws IOException;
+
+  /**
+   * Returns ContainerBalancer status.
+   *
+   * @return True if ContainerBalancer is running, false otherwise.
+   */
+  boolean getContainerBalancerStatus() throws IOException;
+
+  /**
    * returns the list of ratis peer roles. Currently only include peer address.
    */
   List<String> getScmRatisRoles() throws IOException;
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index a2adc11..31c4472 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -36,6 +36,7 @@ import java.util.EnumSet;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -287,6 +288,26 @@ public interface StorageContainerLocationProtocol extends 
Closeable {
   boolean getReplicationManagerStatus() throws IOException;
 
   /**
+   * Start ContainerBalancer.
+   */
+  boolean startContainerBalancer(Optional<Double> threshold,
+      Optional<Integer> idleiterations,
+      Optional<Integer> maxDatanodesToBalance,
+      Optional<Long> maxSizeToMoveInGB) throws IOException;
+
+  /**
+   * Stop ContainerBalancer.
+   */
+  void stopContainerBalancer() throws IOException;
+
+  /**
+   * Returns ContainerBalancer status.
+   *
+   * @return True if ContainerBalancer is running, false otherwise.
+   */
+  boolean getContainerBalancerStatus() throws IOException;
+
+  /**
    * Get Datanode usage information by ip or uuid.
    *
    * @param ipaddress datanode IP address String
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
index a4ae55e..9b88c6a 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
@@ -44,6 +44,9 @@ public enum SCMAction implements AuditAction {
   START_REPLICATION_MANAGER,
   STOP_REPLICATION_MANAGER,
   GET_REPLICATION_MANAGER_STATUS,
+  START_CONTAINER_BALANCER,
+  STOP_CONTAINER_BALANCER,
+  GET_CONTAINER_BALANCER_STATUS,
   GET_CONTAINER_WITH_PIPELINE_BATCH,
   ADD_SCM;
 
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 6aeb74d..b8105ca 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -57,6 +57,8 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.RecommissionNodesResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
@@ -69,6 +71,9 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartMaintenanceNodesResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopContainerBalancerRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type;
 import org.apache.hadoop.hdds.scm.DatanodeAdminError;
 import org.apache.hadoop.hdds.scm.ScmInfo;
@@ -91,6 +96,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Consumer;
 import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
 
@@ -712,6 +718,73 @@ public final class 
StorageContainerLocationProtocolClientSideTranslatorPB
 
   }
 
+  @Override
+  public boolean startContainerBalancer(Optional<Double> threshold,
+                  Optional<Integer> idleiterations,
+                  Optional<Integer> maxDatanodesToBalance,
+                  Optional<Long> maxSizeToMoveInGB) throws IOException{
+    StartContainerBalancerRequestProto.Builder builder =
+        StartContainerBalancerRequestProto.newBuilder();
+    builder.setTraceID(TracingUtil.exportCurrentSpan());
+
+    //make balancer configuration optional
+    if (threshold.isPresent()) {
+      double tsd = threshold.get();
+      Preconditions.checkState(tsd >= 0.0D && tsd < 1.0D,
+          "threshold should to be specified in range [0.0, 1.0).");
+      builder.setThreshold(tsd);
+    }
+    if (maxSizeToMoveInGB.isPresent()) {
+      long mstm = maxSizeToMoveInGB.get();
+      Preconditions.checkState(mstm > 0,
+          "maxSizeToMoveInGB must be positive.");
+      builder.setMaxSizeToMoveInGB(mstm);
+    }
+    if (maxDatanodesToBalance.isPresent()) {
+      int mdtb = maxDatanodesToBalance.get();
+      Preconditions.checkState(mdtb > 0,
+          "maxDatanodesToBalance must be positive.");
+      builder.setMaxDatanodesToBalance(mdtb);
+    }
+    if (idleiterations.isPresent()) {
+      int idi = idleiterations.get();
+      Preconditions.checkState(idi > 0 || idi == -1,
+          "idleiterations must be positive or" +
+              " -1(infinitly run container balancer).");
+      builder.setIdleiterations(idi);
+    }
+
+    StartContainerBalancerRequestProto request = builder.build();
+    StartContainerBalancerResponseProto response =
+        submitRequest(Type.StartContainerBalancer,
+            builder1 -> builder1.setStartContainerBalancerRequest(request))
+            .getStartContainerBalancerResponse();
+    return response.getStart();
+  }
+
+  @Override
+  public void stopContainerBalancer() throws IOException {
+
+    StopContainerBalancerRequestProto request =
+        StopContainerBalancerRequestProto.getDefaultInstance();
+    submitRequest(Type.StopContainerBalancer,
+        builder -> builder.setStopContainerBalancerRequest(request));
+
+  }
+
+  @Override
+  public boolean getContainerBalancerStatus() throws IOException {
+
+    ContainerBalancerStatusRequestProto request =
+        ContainerBalancerStatusRequestProto.getDefaultInstance();
+    ContainerBalancerStatusResponseProto response =
+        submitRequest(Type.GetContainerBalancerStatus,
+            builder -> builder.setContainerBalancerStatusRequest(request))
+            .getContainerBalancerStatusResponse();
+    return response.getIsRunning();
+
+  }
+
   /**
    * Builds request for datanode usage information and receives response.
    *
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto 
b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index 5389a55..d67dce9 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -69,6 +69,9 @@ message ScmContainerLocationRequest {
   optional DatanodeUsageInfoRequestProto DatanodeUsageInfoRequest = 30;
   optional GetExistContainerWithPipelinesInBatchRequestProto 
getExistContainerWithPipelinesInBatchRequest = 31;
   optional GetContainerTokenRequestProto containerTokenRequest = 32;
+  optional StartContainerBalancerRequestProto startContainerBalancerRequest = 
33;
+  optional StopContainerBalancerRequestProto stopContainerBalancerRequest = 34;
+  optional ContainerBalancerStatusRequestProto containerBalancerStatusRequest 
= 35;
 }
 
 message ScmContainerLocationResponse {
@@ -109,6 +112,9 @@ message ScmContainerLocationResponse {
   optional DatanodeUsageInfoResponseProto DatanodeUsageInfoResponse = 30;
   optional GetExistContainerWithPipelinesInBatchResponseProto 
getExistContainerWithPipelinesInBatchResponse = 31;
   optional GetContainerTokenResponseProto containerTokenResponse = 32;
+  optional StartContainerBalancerResponseProto startContainerBalancerResponse 
= 33;
+  optional StopContainerBalancerResponseProto stopContainerBalancerResponse = 
34;
+  optional ContainerBalancerStatusResponseProto 
containerBalancerStatusResponse = 35;
 
   enum Status {
     OK = 1;
@@ -147,6 +153,9 @@ enum Type {
   DatanodeUsageInfo = 25;
   GetExistContainerWithPipelinesInBatch = 26;
   GetContainerToken = 27;
+  StartContainerBalancer = 28;
+  StopContainerBalancer = 29;
+  GetContainerBalancerStatus = 30;
 }
 
 /**
@@ -445,6 +454,33 @@ message GetContainerTokenResponseProto {
   required TokenProto token = 1;
 }
 
+message StartContainerBalancerRequestProto {
+  optional string traceID = 1;
+  optional double threshold = 2;
+  optional int32 idleiterations = 3;
+  optional int32 maxDatanodesToBalance = 4;
+  optional int64 maxSizeToMoveInGB = 5;
+}
+
+message StartContainerBalancerResponseProto {
+  required bool start = 1;
+}
+
+message StopContainerBalancerRequestProto {
+  optional string traceID = 1;
+}
+
+message StopContainerBalancerResponseProto {
+}
+
+message ContainerBalancerStatusRequestProto {
+  optional string traceID = 1;
+}
+
+message ContainerBalancerStatusResponseProto {
+  required bool isRunning = 1;
+}
+
 /**
  * Protocol used from an HDFS node to StorageContainerManager.  See the request
  * and response messages for details of the RPC calls.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
index 4b8501ce..9371bfb 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
@@ -34,6 +34,8 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Container balancer is a service in SCM to move containers between over- and
@@ -51,7 +53,8 @@ public class ContainerBalancer {
   private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
-  private long maxSizeToMove;
+  private long maxSizeToMoveInGB;
+  private int idleIteration;
   private List<DatanodeUsageInfo> unBalancedNodes;
   private List<DatanodeUsageInfo> overUtilizedNodes;
   private List<DatanodeUsageInfo> underUtilizedNodes;
@@ -63,6 +66,8 @@ public class ContainerBalancer {
   private long clusterRemaining;
   private double clusterAvgUtilisation;
   private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
+  private Thread currentBalancingThread;
+  private Lock lock;
 
   /**
    * Constructs ContainerBalancer with the specified arguments. Initializes
@@ -96,28 +101,43 @@ public class ContainerBalancer {
     this.underUtilizedNodes = new ArrayList<>();
     this.unBalancedNodes = new ArrayList<>();
     this.withinThresholdUtilizedNodes = new ArrayList<>();
+    this.lock = new ReentrantLock();
   }
-
   /**
    * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {
-    if (!balancerRunning.compareAndSet(false, true)) {
-      LOG.error("Container Balancer is already running.");
-      return false;
+  public boolean start(
+      ContainerBalancerConfiguration balancerConfiguration) {
+    lock.lock();
+    try {
+      if (!balancerRunning.compareAndSet(false, true)) {
+        LOG.info("Container Balancer is already running.");
+        return false;
+      }
+      
+      this.config = balancerConfiguration;
+      this.idleIteration = config.getIdleIteration();
+      this.threshold = config.getThreshold();
+      this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
+      this.maxSizeToMoveInGB = config.getMaxSizeToMove();
+      this.unBalancedNodes = new ArrayList<>();
+      LOG.info("Starting Container Balancer...{}", this);
+      //we should start a new balancer thread async
+      //and response to cli as soon as possible
+
+
+      //TODO: this is a temporary implementation
+      //modify this later
+      currentBalancingThread = new Thread(() -> balance());
+      currentBalancingThread.start();
+      ////////////////////////
+    } finally {
+      lock.unlock();
     }
 
-    ozoneConfiguration = new OzoneConfiguration();
-    this.config = balancerConfiguration;
-    this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
-    this.maxSizeToMove = config.getMaxSizeToMove();
-    this.unBalancedNodes = new ArrayList<>();
 
-    LOG.info("Starting Container Balancer...{}", this);
-    balance();
     return true;
   }
 
@@ -125,13 +145,18 @@ public class ContainerBalancer {
    * Balances the cluster.
    */
   private void balance() {
-    initializeIteration();
-
-    // unBalancedNodes is not cleared since the next iteration uses this
-    // iteration's unBalancedNodes to find out how many nodes were balanced
-    overUtilizedNodes.clear();
-    underUtilizedNodes.clear();
-    withinThresholdUtilizedNodes.clear();
+    for (int i = 0; i < idleIteration; i++) {
+      if (!initializeIteration()) {
+        //balancer should be stopped immediately
+        break;
+      }
+      // unBalancedNodes is not cleared since the next iteration uses this
+      // iteration's unBalancedNodes to find out how many nodes were balanced
+      overUtilizedNodes.clear();
+      underUtilizedNodes.clear();
+      withinThresholdUtilizedNodes.clear();
+    }
+    balancerRunning.compareAndSet(true, false);
   }
 
   /**
@@ -152,7 +177,6 @@ public class ContainerBalancer {
     if (datanodeUsageInfos.isEmpty()) {
       LOG.info("Container Balancer could not retrieve nodes from Node " +
           "Manager.");
-      stop();
       return false;
     }
 
@@ -221,15 +245,21 @@ public class ContainerBalancer {
         maxDatanodesToBalance) {
       LOG.info("Approaching Max Datanodes To Balance limit in Container " +
           "Balancer. Stopping Balancer.");
-      stop();
       return false;
     } else {
       unBalancedNodes.addAll(overUtilizedNodes);
       unBalancedNodes.addAll(underUtilizedNodes);
 
+      //for now, we just sleep to simulate the execution of balancer
+      //this if for acceptance test now. modify this later when balancer
+      //if fully completed
+      try {
+        Thread.sleep(50);
+      } catch (InterruptedException e) {}
+      /////////////////////////////
+
       if (unBalancedNodes.isEmpty()) {
         LOG.info("Did not find any unbalanced Datanodes.");
-        stop();
         return false;
       } else {
         LOG.info("Container Balancer has identified Datanodes that need to be" 
+
@@ -323,8 +353,27 @@ public class ContainerBalancer {
    * Stops ContainerBalancer.
    */
   public void stop() {
-    balancerRunning.set(false);
-    LOG.info("Container Balancer stopped.");
+    lock.lock();
+    try {
+      //we should stop the balancer thread gracefully
+      if(!balancerRunning.get()) {
+        LOG.info("Container Balancer is not running.");
+        return;
+      }
+
+
+      //TODO: this is a temporary implementation
+      //modify this later
+      if (currentBalancingThread.isAlive()) {
+        currentBalancingThread.stop();
+      }
+      ///////////////////////////
+
+      balancerRunning.compareAndSet(true, false);
+    } finally {
+      lock.unlock();
+    }
+    LOG.info("Container Balancer stopped successfully.");
   }
 
   public void setNodeManager(NodeManager nodeManager) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
index 0549683..d9ae868 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
@@ -56,6 +56,11 @@ public final class ContainerBalancerConfiguration {
           "by Container Balancer.")
   private long maxSizeToMove = 10 * OzoneConsts.GB;
 
+  @Config(key = "idle.iterations", type = ConfigType.INT,
+      defaultValue = "10", tags = {ConfigTag.BALANCER},
+      description = "The idle iteration count of Container Balancer")
+  private int idleIterations = 10;
+
   /**
    * Gets the threshold value for Container Balancer.
    *
@@ -79,6 +84,29 @@ public final class ContainerBalancerConfiguration {
   }
 
   /**
+   * Gets the idle iteration value for Container Balancer.
+   *
+   * @return a idle iteration count larger than 0
+   */
+  public int getIdleIteration() {
+    return idleIterations;
+  }
+
+  /**
+   * Sets the idle iteration value for Container Balancer.
+   *
+   * @param count a idle iteration count larger than 0
+   */
+  public void setIdleIteration(int count) {
+    if (count < -1 || 0 == count) {
+      throw new IllegalArgumentException(
+          "Idle iteration count must be larger than 0 or " +
+              "-1(for infinitely running).");
+    }
+    this.idleIterations = count;
+  }
+
+  /**
    * Gets the value of maximum number of datanodes that will be balanced by
    * Container Balancer.
    *
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 07c1095..2aaa3a4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -62,6 +62,8 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.RecommissionNodesResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
@@ -78,6 +80,10 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopContainerBalancerRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopContainerBalancerResponseProto;
 import org.apache.hadoop.hdds.scm.DatanodeAdminError;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -97,6 +103,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto.Error.errorPipelineAlreadyExists;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto.Error.success;
@@ -306,6 +313,27 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
             .setReplicationManagerStatusResponse(getReplicationManagerStatus(
                 request.getSeplicationManagerStatusRequest()))
             .build();
+      case StartContainerBalancer:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setStartContainerBalancerResponse(startContainerBalancer(
+                request.getStartContainerBalancerRequest()))
+            .build();
+      case StopContainerBalancer:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setStopContainerBalancerResponse(stopContainerBalancer(
+                request.getStopContainerBalancerRequest()))
+            .build();
+      case GetContainerBalancerStatus:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setContainerBalancerStatusResponse(getContainerBalancerStatus(
+                request.getContainerBalancerStatusRequest()))
+            .build();
       case GetPipeline:
         return ScmContainerLocationResponse.newBuilder()
             .setCmdType(request.getCmdType())
@@ -611,6 +639,48 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
         .setIsRunning(impl.getReplicationManagerStatus()).build();
   }
 
+  public StartContainerBalancerResponseProto startContainerBalancer(
+      StartContainerBalancerRequestProto request)
+      throws IOException {
+    Optional<Double> threshold = Optional.empty();
+    Optional<Integer> idleiterations = Optional.empty();
+    Optional<Integer> maxDatanodesToBalance = Optional.empty();
+    Optional<Long> maxSizeToMoveInGB = Optional.empty();
+
+    if(request.hasThreshold()) {
+      threshold = Optional.of(request.getThreshold());
+    }
+    if(request.hasIdleiterations()) {
+      idleiterations = Optional.of(request.getIdleiterations());
+    }
+    if(request.hasMaxDatanodesToBalance()) {
+      maxDatanodesToBalance = Optional.of(request.getMaxDatanodesToBalance());
+    }
+    if(request.hasMaxSizeToMoveInGB()) {
+      maxSizeToMoveInGB = Optional.of(request.getMaxSizeToMoveInGB());
+    }
+
+    return StartContainerBalancerResponseProto.newBuilder().
+        setStart(impl.startContainerBalancer(threshold,
+            idleiterations, maxDatanodesToBalance,
+            maxSizeToMoveInGB)).build();
+  }
+
+  public StopContainerBalancerResponseProto stopContainerBalancer(
+      StopContainerBalancerRequestProto request)
+      throws IOException {
+    impl.stopContainerBalancer();
+    return StopContainerBalancerResponseProto.newBuilder().build();
+
+  }
+
+  public ContainerBalancerStatusResponseProto getContainerBalancerStatus(
+      ContainerBalancerStatusRequestProto request)
+      throws IOException {
+    return ContainerBalancerStatusResponseProto.newBuilder()
+        .setIsRunning(impl.getContainerBalancerStatus()).build();
+  }
+
   public DecommissionNodesResponseProto decommissionNodes(
       DecommissionNodesRequestProto request) throws IOException {
     List<DatanodeAdminError> errors =
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
index b0e1ae1..7948ffd 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
 import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -50,6 +51,8 @@ public interface OzoneStorageContainerManager {
 
   ReplicationManager getReplicationManager();
 
+  ContainerBalancer getContainerBalancer();
+
   InetSocketAddress getDatanodeRpcAddress();
 
   SCMNodeDetails getScmNodeDetails();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index e8e6cce..a374766 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -22,6 +22,7 @@
 package org.apache.hadoop.hdds.scm.server;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.ProtocolMessageEnum;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
 import org.apache.hadoop.hdds.scm.DatanodeAdminError;
 import org.apache.hadoop.hdds.scm.ScmInfo;
+import 
org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancerConfiguration;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -60,6 +62,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.AuditAction;
 import org.apache.hadoop.ozone.audit.AuditEventStatus;
 import org.apache.hadoop.ozone.audit.AuditLogger;
@@ -80,6 +83,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Collections;
 import java.util.ArrayList;
+import java.util.Optional;
 import java.util.TreeSet;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -670,6 +674,65 @@ public class SCMClientProtocolServer implements
     return scm.getReplicationManager().isRunning();
   }
 
+  @Override
+  public boolean startContainerBalancer(Optional<Double> threshold,
+                  Optional<Integer> idleiterations,
+                  Optional<Integer> maxDatanodesToBalance,
+                  Optional<Long> maxSizeToMoveInGB) throws IOException{
+    getScm().checkAdminAccess(getRemoteUser());
+    ContainerBalancerConfiguration cbc = new ContainerBalancerConfiguration();
+    if (threshold.isPresent()) {
+      double tsd = threshold.get();
+      Preconditions.checkState(tsd >= 0.0D && tsd < 1.0D,
+          "threshold should to be specified in range [0.0, 1.0).");
+      cbc.setThreshold(tsd);
+    }
+    if (maxSizeToMoveInGB.isPresent()) {
+      long mstm = maxSizeToMoveInGB.get();
+      Preconditions.checkState(mstm > 0,
+          "maxSizeToMoveInGB must be positive.");
+      cbc.setMaxSizeToMove(mstm * OzoneConsts.GB);
+    }
+    if (maxDatanodesToBalance.isPresent()) {
+      int mdtb = maxDatanodesToBalance.get();
+      Preconditions.checkState(mdtb > 0,
+          "maxDatanodesToBalance must be positive.");
+      cbc.setMaxDatanodesToBalance(mdtb);
+    }
+    if (idleiterations.isPresent()) {
+      int idi = idleiterations.get();
+      Preconditions.checkState(idi > 0 || idi == -1,
+          "idleiterations must be positive or" +
+              " -1(infinitly run container balancer).");
+      cbc.setIdleIteration(idi);
+    }
+
+    boolean isStartedSuccessfully = scm.getContainerBalancer().start(cbc);
+    if (isStartedSuccessfully) {
+      AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+          SCMAction.START_CONTAINER_BALANCER, null));
+    } else {
+      AUDIT.logWriteFailure(buildAuditMessageForSuccess(
+          SCMAction.START_CONTAINER_BALANCER, null));
+    }
+    return  isStartedSuccessfully;
+  }
+
+  @Override
+  public void stopContainerBalancer() throws IOException {
+    getScm().checkAdminAccess(getRemoteUser());
+    AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+        SCMAction.STOP_CONTAINER_BALANCER, null));
+    scm.getContainerBalancer().stop();
+  }
+
+  @Override
+  public boolean getContainerBalancerStatus() {
+    AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+        SCMAction.GET_CONTAINER_BALANCER_STATUS, null));
+    return scm.getContainerBalancer().isBalancerRunning();
+  }
+
   /**
    * Get Datanode usage info such as capacity, SCMUsed, and remaining by ip
    * or uuid.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 1afdb95..8cd3357 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -1512,6 +1512,12 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     return replicationManager;
   }
 
+  @VisibleForTesting
+  @Override
+  public ContainerBalancer getContainerBalancer() {
+    return containerBalancer;
+  }
+
   /**
    * Check if the current scm is the leader and ready for accepting requests.
    * @return - if the current scm is the leader and is ready.
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
index 4c76a26..d09f579 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
@@ -69,6 +69,7 @@ public class TestContainerBalancer {
 
     balancerConfiguration = new ContainerBalancerConfiguration();
     balancerConfiguration.setThreshold(0.1);
+    balancerConfiguration.setIdleIteration(1);
     balancerConfiguration.setMaxDatanodesToBalance(10);
     balancerConfiguration.setMaxSizeToMove(500 * OzoneConsts.GB);
     conf.setFromObject(balancerConfiguration);
@@ -96,11 +97,18 @@ public class TestContainerBalancer {
 
       balancerConfiguration.setThreshold(randomThreshold);
       containerBalancer.start(balancerConfiguration);
+
+      // waiting for balance completed.
+      // TODO: this is a temporary implementation for now
+      // modify this after balancer is fully completed
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {}
+
       expectedUnBalancedNodes =
           determineExpectedUnBalancedNodes(randomThreshold);
       unBalancedNodesAccordingToBalancer =
           containerBalancer.getUnBalancedNodes();
-
       Assert.assertEquals(
           expectedUnBalancedNodes.size(),
           unBalancedNodesAccordingToBalancer.size());
@@ -134,11 +142,17 @@ public class TestContainerBalancer {
   @Test
   public void containerBalancerShouldStopWhenMaxDatanodesToBalanceIsReached() {
     balancerConfiguration.setMaxDatanodesToBalance(2);
-    balancerConfiguration.setThreshold(0);
+    balancerConfiguration.setThreshold(0.1);
     containerBalancer.start(balancerConfiguration);
 
+    // waiting for balance completed.
+    // TODO: this is a temporary implementation for now
+    // modify this after balancer is fully completed
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException e) {}
+
     Assert.assertFalse(containerBalancer.isBalancerRunning());
-    containerBalancer.stop();
   }
 
   /**
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerCommands.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerCommands.java
new file mode 100644
index 0000000..bc25444
--- /dev/null
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerCommands.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.hdds.cli.GenericCli;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.cli.OzoneAdmin;
+import org.apache.hadoop.hdds.cli.SubcommandWithParent;
+
+import org.kohsuke.MetaInfServices;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Model.CommandSpec;
+import picocli.CommandLine.Spec;
+
+/**
+ * Subcommand to group container balancer related operations.
+ *
+ * <p>The balancer is a tool that balances datanode space usage on an Ozone
+ * cluster when some datanodes become full or when new empty nodes join
+ * the cluster. The tool can be run by the cluster administrator
+ * from command line while applications adding and deleting blocks.
+ *
+ * <p>SYNOPSIS
+ * <pre>
+ * To start:
+ *      ozone admin containerbalancer start
+ *      [ -t/--threshold {@literal <threshold>}]
+ *      [ -i/--idleiterations {@literal <idleiterations>}]
+ *      [ -d/--maxDatanodesToBalance {@literal <maxDatanodesToBalance>}]
+ *      [ -s/--maxSizeToMoveInGB {@literal <maxSizeToMoveInGB>}]
+ *      Examples:
+ *      ozone admin containerbalancer start
+ *        start balancer with default values in the configuration
+ *      ozone admin containerbalancer start -t 0.05
+ *        start balancer with a threshold of 5%
+ *      ozone admin containerbalancer start -i 20
+ *        start balancer with maximum 20 consecutive idle iterations
+ *      ozone admin containerbalancer start -i 0
+ *        run balancer infinitely with default values in the configuration
+ *      ozone admin containerbalancer start -d 10
+ *        start balancer with maximum 10 datanodes to balance
+ *      ozone admin containerbalancer start -s 10
+ *        start balancer with maximum size of 10GB to move
+ * To stop:
+ *      ozone admin containerbalancer stop
+ * </pre>
+ *
+ * <p>DESCRIPTION
+ * <p>The threshold parameter is a fraction in the range of (1%, 100%) with a
+ * default value of 10%. The threshold sets a target for whether the cluster
+ * is balanced. A cluster is balanced if for each datanode, the utilization
+ * of the node (ratio of used space at the node to total capacity of the node)
+ * differs from the utilization of the (ratio of used space in the cluster
+ * to total capacity of the cluster) by no more than the threshold value.
+ * The smaller the threshold, the more balanced a cluster will become.
+ * It takes more time to run the balancer for small threshold values.
+ * Also for a very small threshold the cluster may not be able to reach the
+ * balanced state when applications write and delete files concurrently.
+ *
+ * <p>The administrator can interrupt the execution of the balancer at any
+ * time by running the command "ozone admin containerbalancer stop"
+ * through command line
+ */
+@Command(
+    name = "containerbalancer",
+    description = "ContainerBalancer specific operations",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class,
+    subcommands = {
+        ContainerBalancerStartSubcommand.class,
+        ContainerBalancerStopSubcommand.class,
+        ContainerBalancerStatusSubcommand.class
+    })
+@MetaInfServices(SubcommandWithParent.class)
+public class ContainerBalancerCommands implements Callable<Void>,
+    SubcommandWithParent {
+
+  @Spec
+  private CommandSpec spec;
+
+  @Override
+  public Void call() throws Exception {
+    GenericCli.missingSubcommand(spec);
+    return null;
+  }
+
+  @Override
+  public Class<?> getParentType() {
+    return OzoneAdmin.class;
+  }
+}
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStartSubcommand.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStartSubcommand.java
new file mode 100644
index 0000000..94d9ef7
--- /dev/null
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStartSubcommand.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Handler to start container balancer.
+ */
+@Command(
+    name = "start",
+    description = "Start ContainerBalancer",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class ContainerBalancerStartSubcommand extends ScmSubcommand {
+
+  @Option(names = {"-t", "--threshold"},
+      description = "Threshold target whether the cluster is balanced")
+  private Optional<Double> threshold;
+
+  @Option(names = {"-i", "--idleiterations"},
+      description = "Maximum consecutive idle iterations")
+  private Optional<Integer> idleiterations;
+
+  @Option(names = {"-d", "--maxDatanodesToBalance"},
+      description = "Maximum datanodes to move")
+  private Optional<Integer> maxDatanodesToBalance;
+
+  @Option(names = {"-s", "--maxSizeToMoveInGB"},
+      description = "Maximum size to move in GB, " +
+          "for 10GB it should be set as 10")
+  private Optional<Long> maxSizeToMoveInGB;
+
+  @Override
+  public void execute(ScmClient scmClient) throws IOException {
+    boolean result = scmClient.startContainerBalancer(threshold, 
idleiterations,
+        maxDatanodesToBalance, maxSizeToMoveInGB);
+    if (result) {
+      System.out.println("Starting ContainerBalancer Successfully.");
+      return;
+    }
+    System.out.println("ContainerBalancer is already running, " +
+        "Please stop it first.");
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStatusSubcommand.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStatusSubcommand.java
new file mode 100644
index 0000000..e0cd436bd
--- /dev/null
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStatusSubcommand.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine.Command;
+
+import java.io.IOException;
+
+/**
+ * Handler to query status of container balancer.
+ */
+@Command(
+    name = "status",
+    description = "Check if ContainerBalancer is running or not",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class ContainerBalancerStatusSubcommand extends ScmSubcommand {
+
+  @Override
+  public void execute(ScmClient scmClient) throws IOException {
+    boolean execReturn = scmClient.getContainerBalancerStatus();
+    if(execReturn){
+      System.out.println("ContainerBalancer is Running.");
+    } else {
+      System.out.println("ContainerBalancer is Not Running.");
+    }
+  }
+}
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStopSubcommand.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStopSubcommand.java
new file mode 100644
index 0000000..89e7680
--- /dev/null
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStopSubcommand.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine.Command;
+
+import java.io.IOException;
+
+/**
+ * Handler to stop container balancer.
+ */
+@Command(
+    name = "stop",
+    description = "Stop ContainerBalancer",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class ContainerBalancerStopSubcommand extends ScmSubcommand {
+  @Override
+  public void execute(ScmClient scmClient) throws IOException {
+    scmClient.stopContainerBalancer();
+    System.out.println("Stopping ContainerBalancer...");
+  }
+}
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index e48a719..aeed4ef 100644
--- 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.security.cert.X509Certificate;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.tuple.Pair;
@@ -551,6 +552,25 @@ public class ContainerOperationClient implements ScmClient 
{
   }
 
   @Override
+  public boolean startContainerBalancer(Optional<Double>threshold,
+                Optional<Integer> idleiterations,
+                Optional<Integer> maxDatanodesToBalance,
+                Optional<Long> maxSizeToMoveInGB) throws IOException {
+    return storageContainerLocationClient.startContainerBalancer(threshold,
+        idleiterations, maxDatanodesToBalance, maxSizeToMoveInGB);
+  }
+
+  @Override
+  public void stopContainerBalancer() throws IOException {
+    storageContainerLocationClient.stopContainerBalancer();
+  }
+
+  @Override
+  public boolean getContainerBalancerStatus() throws IOException {
+    return storageContainerLocationClient.getContainerBalancerStatus();
+  }
+
+  @Override
   public List<String> getScmRatisRoles() throws IOException {
     return storageContainerLocationClient.getScmInfo().getRatisPeerRoles();
   }
diff --git 
a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestContainerBalancerSubCommand.java
 
b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestContainerBalancerSubCommand.java
new file mode 100644
index 0000000..b377895
--- /dev/null
+++ 
b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestContainerBalancerSubCommand.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.datanode;
+
+import org.apache.hadoop.hdds.scm.cli.ContainerBalancerStopSubcommand;
+import org.apache.hadoop.hdds.scm.cli.ContainerBalancerStartSubcommand;
+import org.apache.hadoop.hdds.scm.cli.ContainerBalancerStatusSubcommand;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests to validate the the ContainerBalancerSubCommand class includes 
the
+ * correct output when executed against a mock client.
+ */
+public class TestContainerBalancerSubCommand {
+
+  private ContainerBalancerStopSubcommand stopCmd;
+  private ContainerBalancerStartSubcommand startCmd;
+  private ContainerBalancerStatusSubcommand statusCmd;
+  private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+  private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
+  private final PrintStream originalOut = System.out;
+  private final PrintStream originalErr = System.err;
+  private static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name();
+
+  @Before
+  public void setup() throws UnsupportedEncodingException {
+    stopCmd = new ContainerBalancerStopSubcommand();
+    startCmd = new ContainerBalancerStartSubcommand();
+    statusCmd = new ContainerBalancerStatusSubcommand();
+    System.setOut(new PrintStream(outContent, false, DEFAULT_ENCODING));
+    System.setErr(new PrintStream(errContent, false, DEFAULT_ENCODING));
+  }
+
+  @After
+  public void tearDown() {
+    System.setOut(originalOut);
+    System.setErr(originalErr);
+  }
+
+  @Test
+  public void testContainerBalancerStatusSubcommandRunning()
+      throws IOException  {
+    ScmClient scmClient = mock(ScmClient.class);
+
+    //test status is running
+    Mockito.when(scmClient.getContainerBalancerStatus())
+        .thenAnswer(invocation -> true);
+
+    statusCmd.execute(scmClient);
+
+    Pattern p = Pattern.compile(
+        "^ContainerBalancer\\sis\\sRunning.");
+    Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING));
+    assertTrue(m.find());
+  }
+
+  @Test
+  public void testContainerBalancerStatusSubcommandNotRunning()
+      throws IOException  {
+    ScmClient scmClient = mock(ScmClient.class);
+
+    Mockito.when(scmClient.getContainerBalancerStatus())
+        .thenAnswer(invocation -> false);
+
+    statusCmd.execute(scmClient);
+
+    Pattern p = Pattern.compile(
+        "^ContainerBalancer\\sis\\sNot\\sRunning.");
+    Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING));
+    assertTrue(m.find());
+  }
+
+  @Test
+  public void testContainerBalancerStopSubcommand() throws IOException  {
+    ScmClient scmClient = mock(ScmClient.class);
+    stopCmd.execute(scmClient);
+
+    Pattern p = Pattern.compile("^Stopping\\sContainerBalancer...");
+    Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING));
+    assertTrue(m.find());
+  }
+
+  @Test
+  public void testContainerBalancerStartSubcommandWhenBalancerIsNotRunning()
+      throws IOException  {
+    ScmClient scmClient = mock(ScmClient.class);
+    Mockito.when(scmClient.startContainerBalancer(null, null, null, null))
+        .thenAnswer(invocation -> true);
+    startCmd.execute(scmClient);
+
+    Pattern p = Pattern.compile("^Starting\\sContainerBalancer" +
+        "\\sSuccessfully.");
+    Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING));
+    assertTrue(m.find());
+  }
+
+  @Test
+  public void testContainerBalancerStartSubcommandWhenBalancerIsRunning()
+      throws IOException  {
+    ScmClient scmClient = mock(ScmClient.class);
+    Mockito.when(scmClient.startContainerBalancer(null, null, null, null))
+        .thenAnswer(invocation -> false);
+    startCmd.execute(scmClient);
+
+    Pattern p = Pattern.compile("^ContainerBalancer\\sis\\salready\\srunning," 
+
+        "\\sPlease\\sstop\\sit\\sfirst.");
+    Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING));
+    assertTrue(m.find());
+  }
+
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerBalancerOperations.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerBalancerOperations.java
new file mode 100644
index 0000000..7e52520
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerBalancerOperations.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.Optional;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class tests container balancer operations
+ * from cblock clients.
+ */
+public class TestContainerBalancerOperations {
+
+  /**
+    * Set a timeout for each test.
+    */
+  @Rule
+  public Timeout timeout = Timeout.seconds(300);
+
+  private static ScmClient containerBalancerClient;
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration ozoneConf;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ozoneConf = new OzoneConfiguration();
+    ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+        SCMContainerPlacementCapacity.class, PlacementPolicy.class);
+    cluster = 
MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build();
+    containerBalancerClient = new ContainerOperationClient(ozoneConf);
+    cluster.waitForClusterToBeReady();
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    if(cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * test container balancer operation with {@link ContainerOperationClient}.
+   * @throws Exception
+   */
+  @Test
+  public void testContainerBalancerCLIOperations() throws Exception {
+    // test normally start and stop
+    boolean running = containerBalancerClient.getContainerBalancerStatus();
+    assertFalse(running);
+    Optional<Double> threshold = Optional.of(0.1);
+    Optional<Integer> idleiterations = Optional.of(10000);
+    Optional<Integer> maxDatanodesToBalance = Optional.of(1);
+    Optional<Long> maxSizeToMoveInGB = Optional.of(1L);
+
+    containerBalancerClient.startContainerBalancer(threshold, idleiterations,
+        maxDatanodesToBalance, maxSizeToMoveInGB);
+    running = containerBalancerClient.getContainerBalancerStatus();
+    assertTrue(running);
+
+    // waiting for balance completed.
+    // TODO: this is a temporary implementation for now
+    // modify this after balancer is fully completed
+    try {
+      Thread.sleep(100);
+    } catch (InterruptedException e) {}
+
+    running = containerBalancerClient.getContainerBalancerStatus();
+    assertFalse(running);
+
+    // test normally start , and stop it before balance is completed
+    containerBalancerClient.startContainerBalancer(threshold, idleiterations,
+        maxDatanodesToBalance, maxSizeToMoveInGB);
+    running = containerBalancerClient.getContainerBalancerStatus();
+    assertTrue(running);
+
+    containerBalancerClient.stopContainerBalancer();
+    running = containerBalancerClient.getContainerBalancerStatus();
+    assertFalse(running);
+  }
+
+  //TODO: add more acceptance after container balancer is fully completed
+}
\ No newline at end of file
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index ebef4d8..652ab6e 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
@@ -323,6 +324,11 @@ public class ReconStorageContainerManagerFacade
   }
 
   @Override
+  public ContainerBalancer getContainerBalancer() {
+    return null;
+  }
+
+  @Override
   public InetSocketAddress getDatanodeRpcAddress() {
     return getDatanodeProtocolServer().getDatanodeRpcAddress();
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to