lokeshj1703 commented on code in PR #3760:
URL: https://github.com/apache/ozone/pull/3760#discussion_r984232130


##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java:
##########
@@ -280,4 +280,8 @@ private HddsConfigKeys() {
 
   public static final String OZONE_AUDIT_LOG_DEBUG_CMD_LIST_DNAUDIT =
       "ozone.audit.log.debug.cmd.list.dnaudit";
+
+  // The path where datanode diskBalancer's conf is to be written to.
+  public static final String HDDS_DATANODE_DISK_BALANCER_INFO_DIR =
+      "hdds.datanode.disk.balancer.info.dir";

Review Comment:
   Can this config be moved to DiskBalancerConfiguration?



##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -174,6 +174,13 @@
       The value should be between 0-1. Such as 0.1 which means 10% of volume 
space will be reserved.
     </description>
   </property>
+  <property>
+    <name>hdds.datanode.disk.balancer.info.dir</name>
+    <value/>
+    <tag>STORAGE, DISK_BALANCER</tag>
+    <description>The file to persist latest DiskBalancer information.
+    </description>
+  </property>

Review Comment:
   We can probably remove this change if config is moved to 
DiskBalancerConfiguration



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * A per-datanode disk balancing service takes in charge
+ * of moving contains among disks.
+ */
+public class DiskBalancerService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerService.class);
+
+  public static final String DISK_BALANCER_DIR = "diskBalancer";
+
+  private OzoneContainer ozoneContainer;
+  private final ConfigurationSource conf;
+
+  private boolean shouldRun = false;
+  private double threshold;
+  private long bandwidthInMB;
+  private int parallelThread;
+
+  private AtomicLong totalBalancedBytes = new AtomicLong(0L);
+  private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L);
+  private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
+
+  private Map<DiskBalancerTask, Integer> inProgressTasks;
+  private Set<Long> inProgressContainers;
+  private Map<HddsVolume, Long> deltaSizes;
+  private MutableVolumeSet volumeSet;
+
+  private final File diskBalancerInfoFile;
+
+  private DiskBalancerServiceMetrics metrics;
+
+  public DiskBalancerService(OzoneContainer ozoneContainer,
+      long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit,
+      int workerSize, ConfigurationSource conf) throws IOException {
+    super("DiskBalancerService", serviceCheckInterval, timeUnit, workerSize,
+        serviceCheckTimeout);
+    this.ozoneContainer = ozoneContainer;
+    this.conf = conf;
+
+    String diskBalancerInfoPath = getDiskBalancerInfoPath();
+    Preconditions.checkNotNull(diskBalancerInfoPath);
+    diskBalancerInfoFile = new File(diskBalancerInfoPath);
+
+    inProgressTasks = new ConcurrentHashMap<>();
+    inProgressContainers = ConcurrentHashMap.newKeySet();
+    deltaSizes = new ConcurrentHashMap<>();
+    volumeSet = ozoneContainer.getVolumeSet();
+
+    metrics = DiskBalancerServiceMetrics.create();
+
+    loadDiskBalancerInfo();
+
+    constructTmpDir();
+  }
+
+  /**
+   * Update DiskBalancerService based on new DiskBalancerInfo.
+   * @param diskBalancerInfo
+   * @throws IOException
+   */
+  public void refresh(DiskBalancerInfo diskBalancerInfo) throws IOException {
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void constructTmpDir() {
+    for (HddsVolume volume:
+        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) {
+      Path tmpDir = getDiskBalancerTmpDir(volume);
+      try {
+        FileUtils.deleteFully(tmpDir);
+        FileUtils.createDirectories(tmpDir);
+      } catch (IOException ex) {
+        LOG.warn("Can not reconstruct tmp directory under volume {}", volume,
+            ex);
+      }
+    }
+  }
+
+  /**
+   * If the diskBalancer.info file exists, load the file. If not exists,
+   * return the default config.
+   * @throws IOException
+   */
+  private void loadDiskBalancerInfo() throws IOException {
+    DiskBalancerInfo diskBalancerInfo = null;
+    try {
+      if (diskBalancerInfoFile.exists()) {
+        diskBalancerInfo = readDiskBalancerInfoFile(diskBalancerInfoFile);
+      }
+    } catch (IOException e) {
+      LOG.warn("Can not load diskBalancerInfo from diskBalancer.info file. " +
+          "Falling back to default configs", e);
+    } finally {
+      if (diskBalancerInfo == null) {
+        boolean shouldRunDefault = conf.getObject(DatanodeConfiguration.class)
+            .getDiskBalancerShouldRun();
+        diskBalancerInfo = new DiskBalancerInfo(shouldRunDefault,
+            new DiskBalancerConfiguration());
+      }
+    }
+
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo)
+      throws IOException {
+    // First store in local file, then update in memory variables
+    writeDiskBalancerInfoTo(diskBalancerInfo, diskBalancerInfoFile);
+
+    setShouldRun(diskBalancerInfo.isShouldRun());
+    setThreshold(diskBalancerInfo.getThreshold());
+    setBandwidthInMB(diskBalancerInfo.getBandwidthInMB());
+    setParallelThread(diskBalancerInfo.getParallelThread());
+
+    // Default executorService is ScheduledThreadPoolExecutor, so we can
+    // update the poll size by setting corePoolSize.
+    if ((getExecutorService() instanceof ScheduledThreadPoolExecutor)) {
+      ((ScheduledThreadPoolExecutor) getExecutorService())
+          .setCorePoolSize(parallelThread);
+    }
+  }
+
+  private String getDiskBalancerInfoPath() {
+    String diskBalancerInfoDir =
+        conf.getTrimmed(HddsConfigKeys.HDDS_DATANODE_DISK_BALANCER_INFO_DIR);
+    if (Strings.isNullOrEmpty(diskBalancerInfoDir)) {
+      File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
+      if (metaDirPath == null) {
+        // this means meta data is not found, in theory should not happen at
+        // this point because should've failed earlier.
+        throw new IllegalArgumentException("Unable to locate meta data" +
+            "directory when getting datanode disk balancer file path");
+      }
+      diskBalancerInfoDir = metaDirPath.toString();
+    }
+    // Use default datanode disk balancer file name for file path
+    return new File(diskBalancerInfoDir,
+        OzoneConsts.OZONE_SCM_DATANODE_DISK_BALANCER_INFO_DEFAULT).toString();
+  }
+
+  /**
+   * Read {@link DiskBalancerInfo} from a local info file.
+   *
+   * @param path DiskBalancerInfo file local path
+   * @return {@link DatanodeDetails}
+   * @throws IOException If the conf file is malformed or other I/O exceptions
+   */
+  private synchronized DiskBalancerInfo readDiskBalancerInfoFile(
+      File path) throws IOException {
+    if (!path.exists()) {
+      throw new IOException("DiskBalancerConf file not found.");
+    }
+    try {
+      return DiskBalancerYaml.readDiskBalancerInfoFile(path);
+    } catch (IOException e) {
+      LOG.warn("Error loading DiskBalancerInfo yaml from {}",
+          path.getAbsolutePath(), e);
+      throw new IOException("Failed to parse DiskBalancerInfo from "
+          + path.getAbsolutePath(), e);
+    }
+  }
+
+  /**
+   * Persistent a {@link DiskBalancerInfo} to a local file.
+   *
+   * @throws IOException when read/write error occurs
+   */
+  private synchronized void writeDiskBalancerInfoTo(
+      DiskBalancerInfo diskBalancerInfo, File path)
+      throws IOException {
+    if (path.exists()) {
+      if (!path.delete() || !path.createNewFile()) {
+        throw new IOException("Unable to overwrite the DiskBalancerInfo 
file.");
+      }
+    } else {
+      if (!path.getParentFile().exists() &&
+          !path.getParentFile().mkdirs()) {
+        throw new IOException("Unable to create DiskBalancerInfo 
directories.");
+      }
+    }
+    DiskBalancerYaml.createDiskBalancerInfoFile(diskBalancerInfo, path);
+  }
+
+
+
+  public void setShouldRun(boolean shouldRun) {
+    this.shouldRun = shouldRun;
+  }
+
+  public void setThreshold(double threshold) {
+    this.threshold = threshold;
+  }
+
+  public void setBandwidthInMB(long bandwidthInMB) {
+    this.bandwidthInMB = bandwidthInMB;
+  }
+
+  public void setParallelThread(int parallelThread) {
+    this.parallelThread = parallelThread;
+  }
+
+  public DiskBalancerInfo getDiskBalancerInfo() {
+    return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB,
+        parallelThread);
+  }
+
+  public DiskBalancerReportProto getDiskBalancerReportProto() {
+    DiskBalancerReportProto.Builder builder =
+        DiskBalancerReportProto.newBuilder();
+    return builder.setIsRunning(shouldRun)
+        .setBalancedBytes(totalBalancedBytes.get())
+        .setDiskBalancerConf(
+            HddsProtos.DiskBalancerConfigurationProto.newBuilder()
+                .setThreshold(threshold)
+                .setDiskBandwidthInMB(bandwidthInMB)
+                .setParallelThread(parallelThread)
+                .build())
+        .build();
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+
+    if (!shouldRun) {
+      return queue;
+    }
+    metrics.incrRunningLoopCount();
+
+    if (shouldDelay()) {
+      metrics.incrIdleLoopExceedsBandwidthCount();
+      return queue;
+    }
+
+    int availableTaskCount = parallelThread - inProgressTasks.size();
+    if (availableTaskCount <= 0) {
+      LOG.info("No available thread for disk balancer service. " +
+          "Current thread count is {}.", parallelThread);
+      return queue;
+    }
+
+    for (int i = 0; i < availableTaskCount; i++) {
+      Pair<HddsVolume, HddsVolume> pair =
+          DiskBalancerUtils.getVolumePair(volumeSet, threshold, deltaSizes);
+      if (pair == null) {
+        continue;
+      }
+      HddsVolume sourceVolume = pair.getLeft(), destVolume = pair.getRight();
+      Iterator<Container<?>> itr = ozoneContainer.getController()
+          .getContainers(sourceVolume);
+      while (itr.hasNext()) {
+        ContainerData containerData = itr.next().getContainerData();
+        if (!inProgressContainers.contains(
+            containerData.getContainerID()) && containerData.isClosed()) {
+          queue.add(new DiskBalancerTask(containerData, sourceVolume,
+              destVolume));
+          inProgressContainers.add(containerData.getContainerID());
+          deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 
0L)
+              - containerData.getBytesUsed());
+          deltaSizes.put(destVolume, deltaSizes.getOrDefault(destVolume, 0L)
+              + containerData.getBytesUsed());
+          break;
+        }
+      }
+    }
+    if (queue.isEmpty()) {
+      metrics.incrIdleLoopNoAvailableVolumePairCount();
+    }
+    return queue;
+  }
+
+  private boolean shouldDelay() {
+    // We should wait for next AvailableTime.
+    if (Time.monotonicNow() <= nextAvailableTime.get()) {
+      return true;
+    }
+    // Calculate the next AvailableTime based on bandwidth
+    long bytesBalanced = balancedBytesInLastWindow.getAndSet(0L);
+
+    final int megaByte = 1024 * 1024;
+
+    // converting disk bandwidth in byte/millisec
+    float bytesPerMillisec = bandwidthInMB * megaByte / 1000f;
+    nextAvailableTime.set(Time.monotonicNow() +
+        ((long) (bytesBalanced / bytesPerMillisec)));
+    return false;
+  }
+
+  private class DiskBalancerTask implements BackgroundTask {
+
+    private HddsVolume sourceVolume;
+    private HddsVolume destVolume;
+    private ContainerData containerData;
+    private long containerId;
+
+    DiskBalancerTask(ContainerData containerData,
+        HddsVolume sourceVolume, HddsVolume destVolume) {
+      this.containerData = containerData;
+      this.sourceVolume = sourceVolume;
+      this.destVolume = destVolume;
+      this.containerId = containerData.getContainerID();
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      boolean destVolumeIncreased = false;
+      Path diskBalancerTmpDir = null, diskBalancerDestDir = null;
+      long containerSize = containerData.getBytesUsed();
+      try {
+        diskBalancerTmpDir = Paths.get(destVolume.getTmpDir().getPath())
+            .resolve(DISK_BALANCER_DIR).resolve(String.valueOf(containerId));
+
+        // Copy container to new Volume's tmp Dir
+        ozoneContainer.getController().copyContainer(
+            containerData.getContainerType(),
+            containerData.getContainerID(), diskBalancerTmpDir);
+
+        // Move container directory to final place on new volume
+        String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(
+            destVolume, destVolume.getClusterID());
+        diskBalancerDestDir =
+            Paths.get(KeyValueContainerLocationUtil.getBaseContainerLocation(
+                destVolume.getHddsRootDir().toString(), idDir,
+                containerData.getContainerID()));
+        Path destDirParent = diskBalancerDestDir.getParent();
+        if (destDirParent != null) {
+          Files.createDirectories(destDirParent);
+        }
+        Files.move(diskBalancerTmpDir, diskBalancerDestDir,
+            StandardCopyOption.ATOMIC_MOVE,
+            StandardCopyOption.REPLACE_EXISTING);
+
+        // Generate a new Container based on destDir
+        File containerFile = ContainerUtils.getContainerFile(
+            diskBalancerDestDir.toFile());
+        if (!containerFile.exists()) {
+          throw new IOException("ContainerFile for container " + containerId
+              + " doesn't exists.");
+        }
+        ContainerData originalContainerData = ContainerDataYaml
+            .readContainerFile(containerFile);
+        Container newContainer = ozoneContainer.getController()
+            .importContainer(originalContainerData, diskBalancerDestDir);
+        newContainer.getContainerData().getVolume()
+            .incrementUsedSpace(containerSize);

Review Comment:
   Do we need this? If we are copying the entire container contents, these 
should be copied as well?



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java:
##########
@@ -112,6 +112,29 @@ public boolean addContainer(Container<?> container) throws
     }
   }
 
+  /**
+   * Update Container to container map.
+   * @param container container to be added
+   * @return If container is added to containerMap returns true, otherwise
+   * false
+   */
+  public Container updateContainer(Container<?> container) throws
+      StorageContainerException {
+    Preconditions.checkNotNull(container, "container cannot be null");
+
+    long containerId = container.getContainerData().getContainerID();
+    if (!containerMap.containsKey(containerId)) {
+      LOG.error("Container doesn't exists with container Id {}", containerId);
+      throw new StorageContainerException("Container doesn't exist with " +
+          "container Id " + containerId,
+          ContainerProtos.Result.CONTAINER_NOT_FOUND);
+    } else {
+      LOG.debug("Container with container Id {} is updated to containerMap",
+            containerId);

Review Comment:
   NIT: enclose in if (LOG.isDebugEnabled()) check.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java:
##########
@@ -112,6 +112,10 @@ public void handle(SCMCommand command, OzoneContainer 
container,
           SCMCommandProto.Type.deleteBlocksCommand, command.getType());
       return;
     }
+    if (shouldRequeue(command, context, container)) {
+      context.requeueCommand(command);
+      return;
+    }

Review Comment:
   For DeleteBlocks we retry the command from SCM. I am not sure if we need a 
requeue here.
   I think we can probably send failure to SCM in this case and add a failure 
message in a later PR?



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java:
##########
@@ -297,6 +297,78 @@ public void setWaitOnAllFollowers(boolean val) {
   )
   private String containerSchemaV3KeySeparator = "|";
 
+  @Config(key = "disk.balancer.should.run.default",
+      defaultValue = "false",
+      type = ConfigType.BOOLEAN,
+      tags = { DATANODE, ConfigTag.DISKBALANCER},
+      description =
+          "Refresh Time interval of the Datanode DiskBalancer service. " +
+              "The Datanode will check the service periodically and update " +
+              "the config and running status for DiskBalancer service. " +
+              "Unit could be defined with postfix (ns,ms,s,m,h,d). "

Review Comment:
   Need to change the message.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java:
##########
@@ -122,4 +129,28 @@ private static void mapDbVolumesToDataVolumesIfNeeded(
         hddsVolume.setDbVolume(globalDbVolumeMap.getOrDefault(
             hddsVolume.getStorageID(), null)));
   }
+
+  /**
+   * Get the HddsVolume according to the path.
+   * @param volumes volume list to match from
+   * @param pathStr path to match
+   */
+  public static HddsVolume matchHddsVolume(List<HddsVolume> volumes,
+      String pathStr) {
+    assert pathStr != null;
+    List<HddsVolume> resList = new ArrayList<>();
+    for (HddsVolume hddsVolume: volumes) {
+      if (pathStr.startsWith(hddsVolume.getVolumeRootDir())) {
+        resList.add(hddsVolume);
+      }
+    }
+    if (resList.size() == 1) {
+      return resList.get(0);
+    } else if (resList.size() > 1) {
+      LOG.warn("Get multi volumes {} matching path {}",
+          resList.stream().map(StorageVolume::getVolumeRootDir).collect(
+              Collectors.joining(",")), pathStr);

Review Comment:
   I think it is better to throw an exception in this case.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * A per-datanode disk balancing service takes in charge
+ * of moving contains among disks.
+ */
+public class DiskBalancerService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerService.class);
+
+  public static final String DISK_BALANCER_DIR = "diskBalancer";
+
+  private OzoneContainer ozoneContainer;
+  private final ConfigurationSource conf;
+
+  private boolean shouldRun = false;
+  private double threshold;
+  private long bandwidthInMB;
+  private int parallelThread;
+
+  private AtomicLong totalBalancedBytes = new AtomicLong(0L);
+  private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L);
+  private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
+
+  private Map<DiskBalancerTask, Integer> inProgressTasks;
+  private Set<Long> inProgressContainers;
+  private Map<HddsVolume, Long> deltaSizes;
+  private MutableVolumeSet volumeSet;
+
+  private final File diskBalancerInfoFile;
+
+  private DiskBalancerServiceMetrics metrics;
+
+  public DiskBalancerService(OzoneContainer ozoneContainer,
+      long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit,
+      int workerSize, ConfigurationSource conf) throws IOException {
+    super("DiskBalancerService", serviceCheckInterval, timeUnit, workerSize,
+        serviceCheckTimeout);
+    this.ozoneContainer = ozoneContainer;
+    this.conf = conf;
+
+    String diskBalancerInfoPath = getDiskBalancerInfoPath();
+    Preconditions.checkNotNull(diskBalancerInfoPath);
+    diskBalancerInfoFile = new File(diskBalancerInfoPath);
+
+    inProgressTasks = new ConcurrentHashMap<>();
+    inProgressContainers = ConcurrentHashMap.newKeySet();
+    deltaSizes = new ConcurrentHashMap<>();
+    volumeSet = ozoneContainer.getVolumeSet();
+
+    metrics = DiskBalancerServiceMetrics.create();
+
+    loadDiskBalancerInfo();
+
+    constructTmpDir();
+  }
+
+  /**
+   * Update DiskBalancerService based on new DiskBalancerInfo.
+   * @param diskBalancerInfo
+   * @throws IOException
+   */
+  public void refresh(DiskBalancerInfo diskBalancerInfo) throws IOException {
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void constructTmpDir() {
+    for (HddsVolume volume:
+        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) {
+      Path tmpDir = getDiskBalancerTmpDir(volume);
+      try {
+        FileUtils.deleteFully(tmpDir);
+        FileUtils.createDirectories(tmpDir);
+      } catch (IOException ex) {
+        LOG.warn("Can not reconstruct tmp directory under volume {}", volume,
+            ex);
+      }
+    }
+  }
+
+  /**
+   * If the diskBalancer.info file exists, load the file. If not exists,
+   * return the default config.
+   * @throws IOException
+   */
+  private void loadDiskBalancerInfo() throws IOException {
+    DiskBalancerInfo diskBalancerInfo = null;
+    try {
+      if (diskBalancerInfoFile.exists()) {
+        diskBalancerInfo = readDiskBalancerInfoFile(diskBalancerInfoFile);
+      }
+    } catch (IOException e) {
+      LOG.warn("Can not load diskBalancerInfo from diskBalancer.info file. " +
+          "Falling back to default configs", e);
+    } finally {
+      if (diskBalancerInfo == null) {
+        boolean shouldRunDefault = conf.getObject(DatanodeConfiguration.class)
+            .getDiskBalancerShouldRun();
+        diskBalancerInfo = new DiskBalancerInfo(shouldRunDefault,
+            new DiskBalancerConfiguration());
+      }
+    }
+
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo)
+      throws IOException {
+    // First store in local file, then update in memory variables
+    writeDiskBalancerInfoTo(diskBalancerInfo, diskBalancerInfoFile);
+
+    setShouldRun(diskBalancerInfo.isShouldRun());
+    setThreshold(diskBalancerInfo.getThreshold());
+    setBandwidthInMB(diskBalancerInfo.getBandwidthInMB());
+    setParallelThread(diskBalancerInfo.getParallelThread());
+
+    // Default executorService is ScheduledThreadPoolExecutor, so we can
+    // update the poll size by setting corePoolSize.
+    if ((getExecutorService() instanceof ScheduledThreadPoolExecutor)) {
+      ((ScheduledThreadPoolExecutor) getExecutorService())
+          .setCorePoolSize(parallelThread);
+    }
+  }
+
+  private String getDiskBalancerInfoPath() {
+    String diskBalancerInfoDir =
+        conf.getTrimmed(HddsConfigKeys.HDDS_DATANODE_DISK_BALANCER_INFO_DIR);
+    if (Strings.isNullOrEmpty(diskBalancerInfoDir)) {
+      File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
+      if (metaDirPath == null) {
+        // this means meta data is not found, in theory should not happen at
+        // this point because should've failed earlier.
+        throw new IllegalArgumentException("Unable to locate meta data" +
+            "directory when getting datanode disk balancer file path");
+      }
+      diskBalancerInfoDir = metaDirPath.toString();
+    }
+    // Use default datanode disk balancer file name for file path
+    return new File(diskBalancerInfoDir,
+        OzoneConsts.OZONE_SCM_DATANODE_DISK_BALANCER_INFO_DEFAULT).toString();
+  }
+
+  /**
+   * Read {@link DiskBalancerInfo} from a local info file.
+   *
+   * @param path DiskBalancerInfo file local path
+   * @return {@link DatanodeDetails}
+   * @throws IOException If the conf file is malformed or other I/O exceptions
+   */
+  private synchronized DiskBalancerInfo readDiskBalancerInfoFile(
+      File path) throws IOException {
+    if (!path.exists()) {
+      throw new IOException("DiskBalancerConf file not found.");
+    }
+    try {
+      return DiskBalancerYaml.readDiskBalancerInfoFile(path);
+    } catch (IOException e) {
+      LOG.warn("Error loading DiskBalancerInfo yaml from {}",
+          path.getAbsolutePath(), e);
+      throw new IOException("Failed to parse DiskBalancerInfo from "
+          + path.getAbsolutePath(), e);
+    }
+  }
+
+  /**
+   * Persistent a {@link DiskBalancerInfo} to a local file.
+   *
+   * @throws IOException when read/write error occurs
+   */
+  private synchronized void writeDiskBalancerInfoTo(
+      DiskBalancerInfo diskBalancerInfo, File path)
+      throws IOException {
+    if (path.exists()) {
+      if (!path.delete() || !path.createNewFile()) {
+        throw new IOException("Unable to overwrite the DiskBalancerInfo 
file.");
+      }
+    } else {
+      if (!path.getParentFile().exists() &&
+          !path.getParentFile().mkdirs()) {
+        throw new IOException("Unable to create DiskBalancerInfo 
directories.");
+      }
+    }
+    DiskBalancerYaml.createDiskBalancerInfoFile(diskBalancerInfo, path);
+  }
+
+
+
+  public void setShouldRun(boolean shouldRun) {
+    this.shouldRun = shouldRun;
+  }
+
+  public void setThreshold(double threshold) {
+    this.threshold = threshold;
+  }
+
+  public void setBandwidthInMB(long bandwidthInMB) {
+    this.bandwidthInMB = bandwidthInMB;
+  }
+
+  public void setParallelThread(int parallelThread) {
+    this.parallelThread = parallelThread;
+  }
+
+  public DiskBalancerInfo getDiskBalancerInfo() {
+    return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB,
+        parallelThread);
+  }
+
+  public DiskBalancerReportProto getDiskBalancerReportProto() {
+    DiskBalancerReportProto.Builder builder =
+        DiskBalancerReportProto.newBuilder();
+    return builder.setIsRunning(shouldRun)
+        .setBalancedBytes(totalBalancedBytes.get())
+        .setDiskBalancerConf(
+            HddsProtos.DiskBalancerConfigurationProto.newBuilder()
+                .setThreshold(threshold)
+                .setDiskBandwidthInMB(bandwidthInMB)
+                .setParallelThread(parallelThread)
+                .build())
+        .build();
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+
+    if (!shouldRun) {
+      return queue;
+    }
+    metrics.incrRunningLoopCount();
+
+    if (shouldDelay()) {
+      metrics.incrIdleLoopExceedsBandwidthCount();
+      return queue;
+    }
+
+    int availableTaskCount = parallelThread - inProgressTasks.size();
+    if (availableTaskCount <= 0) {
+      LOG.info("No available thread for disk balancer service. " +
+          "Current thread count is {}.", parallelThread);
+      return queue;
+    }
+
+    for (int i = 0; i < availableTaskCount; i++) {
+      Pair<HddsVolume, HddsVolume> pair =
+          DiskBalancerUtils.getVolumePair(volumeSet, threshold, deltaSizes);

Review Comment:
   How are we making sure same volume is not part of different tasks?



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * A per-datanode disk balancing service takes in charge
+ * of moving contains among disks.
+ */
+public class DiskBalancerService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerService.class);
+
+  public static final String DISK_BALANCER_DIR = "diskBalancer";
+
+  private OzoneContainer ozoneContainer;
+  private final ConfigurationSource conf;
+
+  private boolean shouldRun = false;
+  private double threshold;
+  private long bandwidthInMB;
+  private int parallelThread;
+
+  private AtomicLong totalBalancedBytes = new AtomicLong(0L);
+  private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L);
+  private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
+
+  private Map<DiskBalancerTask, Integer> inProgressTasks;
+  private Set<Long> inProgressContainers;
+  private Map<HddsVolume, Long> deltaSizes;
+  private MutableVolumeSet volumeSet;
+
+  private final File diskBalancerInfoFile;
+
+  private DiskBalancerServiceMetrics metrics;
+
+  public DiskBalancerService(OzoneContainer ozoneContainer,
+      long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit,
+      int workerSize, ConfigurationSource conf) throws IOException {
+    super("DiskBalancerService", serviceCheckInterval, timeUnit, workerSize,
+        serviceCheckTimeout);
+    this.ozoneContainer = ozoneContainer;
+    this.conf = conf;
+
+    String diskBalancerInfoPath = getDiskBalancerInfoPath();
+    Preconditions.checkNotNull(diskBalancerInfoPath);
+    diskBalancerInfoFile = new File(diskBalancerInfoPath);
+
+    inProgressTasks = new ConcurrentHashMap<>();
+    inProgressContainers = ConcurrentHashMap.newKeySet();
+    deltaSizes = new ConcurrentHashMap<>();
+    volumeSet = ozoneContainer.getVolumeSet();
+
+    metrics = DiskBalancerServiceMetrics.create();
+
+    loadDiskBalancerInfo();
+
+    constructTmpDir();
+  }
+
+  /**
+   * Update DiskBalancerService based on new DiskBalancerInfo.
+   * @param diskBalancerInfo
+   * @throws IOException
+   */
+  public void refresh(DiskBalancerInfo diskBalancerInfo) throws IOException {
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void constructTmpDir() {
+    for (HddsVolume volume:
+        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) {
+      Path tmpDir = getDiskBalancerTmpDir(volume);
+      try {
+        FileUtils.deleteFully(tmpDir);
+        FileUtils.createDirectories(tmpDir);
+      } catch (IOException ex) {
+        LOG.warn("Can not reconstruct tmp directory under volume {}", volume,
+            ex);
+      }
+    }
+  }
+
+  /**
+   * If the diskBalancer.info file exists, load the file. If not exists,
+   * return the default config.
+   * @throws IOException
+   */
+  private void loadDiskBalancerInfo() throws IOException {
+    DiskBalancerInfo diskBalancerInfo = null;
+    try {
+      if (diskBalancerInfoFile.exists()) {
+        diskBalancerInfo = readDiskBalancerInfoFile(diskBalancerInfoFile);
+      }
+    } catch (IOException e) {
+      LOG.warn("Can not load diskBalancerInfo from diskBalancer.info file. " +
+          "Falling back to default configs", e);
+    } finally {
+      if (diskBalancerInfo == null) {
+        boolean shouldRunDefault = conf.getObject(DatanodeConfiguration.class)
+            .getDiskBalancerShouldRun();
+        diskBalancerInfo = new DiskBalancerInfo(shouldRunDefault,
+            new DiskBalancerConfiguration());
+      }
+    }
+
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo)
+      throws IOException {
+    // First store in local file, then update in memory variables
+    writeDiskBalancerInfoTo(diskBalancerInfo, diskBalancerInfoFile);
+
+    setShouldRun(diskBalancerInfo.isShouldRun());
+    setThreshold(diskBalancerInfo.getThreshold());
+    setBandwidthInMB(diskBalancerInfo.getBandwidthInMB());
+    setParallelThread(diskBalancerInfo.getParallelThread());
+
+    // Default executorService is ScheduledThreadPoolExecutor, so we can
+    // update the poll size by setting corePoolSize.
+    if ((getExecutorService() instanceof ScheduledThreadPoolExecutor)) {
+      ((ScheduledThreadPoolExecutor) getExecutorService())
+          .setCorePoolSize(parallelThread);
+    }
+  }
+
+  private String getDiskBalancerInfoPath() {
+    String diskBalancerInfoDir =
+        conf.getTrimmed(HddsConfigKeys.HDDS_DATANODE_DISK_BALANCER_INFO_DIR);
+    if (Strings.isNullOrEmpty(diskBalancerInfoDir)) {
+      File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
+      if (metaDirPath == null) {
+        // this means meta data is not found, in theory should not happen at
+        // this point because should've failed earlier.
+        throw new IllegalArgumentException("Unable to locate meta data" +
+            "directory when getting datanode disk balancer file path");
+      }
+      diskBalancerInfoDir = metaDirPath.toString();
+    }
+    // Use default datanode disk balancer file name for file path
+    return new File(diskBalancerInfoDir,
+        OzoneConsts.OZONE_SCM_DATANODE_DISK_BALANCER_INFO_DEFAULT).toString();
+  }
+
+  /**
+   * Read {@link DiskBalancerInfo} from a local info file.
+   *
+   * @param path DiskBalancerInfo file local path
+   * @return {@link DatanodeDetails}
+   * @throws IOException If the conf file is malformed or other I/O exceptions
+   */
+  private synchronized DiskBalancerInfo readDiskBalancerInfoFile(
+      File path) throws IOException {
+    if (!path.exists()) {
+      throw new IOException("DiskBalancerConf file not found.");
+    }
+    try {
+      return DiskBalancerYaml.readDiskBalancerInfoFile(path);
+    } catch (IOException e) {
+      LOG.warn("Error loading DiskBalancerInfo yaml from {}",
+          path.getAbsolutePath(), e);
+      throw new IOException("Failed to parse DiskBalancerInfo from "
+          + path.getAbsolutePath(), e);
+    }
+  }
+
+  /**
+   * Persistent a {@link DiskBalancerInfo} to a local file.
+   *
+   * @throws IOException when read/write error occurs
+   */
+  private synchronized void writeDiskBalancerInfoTo(
+      DiskBalancerInfo diskBalancerInfo, File path)
+      throws IOException {
+    if (path.exists()) {
+      if (!path.delete() || !path.createNewFile()) {
+        throw new IOException("Unable to overwrite the DiskBalancerInfo 
file.");
+      }
+    } else {
+      if (!path.getParentFile().exists() &&
+          !path.getParentFile().mkdirs()) {
+        throw new IOException("Unable to create DiskBalancerInfo 
directories.");
+      }
+    }
+    DiskBalancerYaml.createDiskBalancerInfoFile(diskBalancerInfo, path);
+  }
+
+
+
+  public void setShouldRun(boolean shouldRun) {
+    this.shouldRun = shouldRun;
+  }
+
+  public void setThreshold(double threshold) {
+    this.threshold = threshold;
+  }
+
+  public void setBandwidthInMB(long bandwidthInMB) {
+    this.bandwidthInMB = bandwidthInMB;
+  }
+
+  public void setParallelThread(int parallelThread) {
+    this.parallelThread = parallelThread;
+  }
+
+  public DiskBalancerInfo getDiskBalancerInfo() {
+    return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB,
+        parallelThread);
+  }
+
+  public DiskBalancerReportProto getDiskBalancerReportProto() {
+    DiskBalancerReportProto.Builder builder =
+        DiskBalancerReportProto.newBuilder();
+    return builder.setIsRunning(shouldRun)
+        .setBalancedBytes(totalBalancedBytes.get())
+        .setDiskBalancerConf(
+            HddsProtos.DiskBalancerConfigurationProto.newBuilder()
+                .setThreshold(threshold)
+                .setDiskBandwidthInMB(bandwidthInMB)
+                .setParallelThread(parallelThread)
+                .build())
+        .build();
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+
+    if (!shouldRun) {
+      return queue;
+    }
+    metrics.incrRunningLoopCount();
+
+    if (shouldDelay()) {
+      metrics.incrIdleLoopExceedsBandwidthCount();
+      return queue;
+    }
+
+    int availableTaskCount = parallelThread - inProgressTasks.size();
+    if (availableTaskCount <= 0) {
+      LOG.info("No available thread for disk balancer service. " +
+          "Current thread count is {}.", parallelThread);
+      return queue;
+    }
+
+    for (int i = 0; i < availableTaskCount; i++) {
+      Pair<HddsVolume, HddsVolume> pair =
+          DiskBalancerUtils.getVolumePair(volumeSet, threshold, deltaSizes);

Review Comment:
   I think new tasks should not be added until the older tasks finish for 
better consistency.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * A per-datanode disk balancing service takes in charge
+ * of moving contains among disks.
+ */
+public class DiskBalancerService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerService.class);
+
+  public static final String DISK_BALANCER_DIR = "diskBalancer";
+
+  private OzoneContainer ozoneContainer;
+  private final ConfigurationSource conf;
+
+  private boolean shouldRun = false;
+  private double threshold;
+  private long bandwidthInMB;
+  private int parallelThread;
+
+  private AtomicLong totalBalancedBytes = new AtomicLong(0L);
+  private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L);
+  private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
+
+  private Map<DiskBalancerTask, Integer> inProgressTasks;
+  private Set<Long> inProgressContainers;
+  private Map<HddsVolume, Long> deltaSizes;
+  private MutableVolumeSet volumeSet;
+
+  private final File diskBalancerInfoFile;
+
+  private DiskBalancerServiceMetrics metrics;
+
+  public DiskBalancerService(OzoneContainer ozoneContainer,
+      long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit,
+      int workerSize, ConfigurationSource conf) throws IOException {
+    super("DiskBalancerService", serviceCheckInterval, timeUnit, workerSize,
+        serviceCheckTimeout);
+    this.ozoneContainer = ozoneContainer;
+    this.conf = conf;
+
+    String diskBalancerInfoPath = getDiskBalancerInfoPath();
+    Preconditions.checkNotNull(diskBalancerInfoPath);
+    diskBalancerInfoFile = new File(diskBalancerInfoPath);
+
+    inProgressTasks = new ConcurrentHashMap<>();
+    inProgressContainers = ConcurrentHashMap.newKeySet();
+    deltaSizes = new ConcurrentHashMap<>();
+    volumeSet = ozoneContainer.getVolumeSet();
+
+    metrics = DiskBalancerServiceMetrics.create();
+
+    loadDiskBalancerInfo();
+
+    constructTmpDir();
+  }
+
+  /**
+   * Update DiskBalancerService based on new DiskBalancerInfo.
+   * @param diskBalancerInfo
+   * @throws IOException
+   */
+  public void refresh(DiskBalancerInfo diskBalancerInfo) throws IOException {
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void constructTmpDir() {
+    for (HddsVolume volume:
+        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) {
+      Path tmpDir = getDiskBalancerTmpDir(volume);
+      try {
+        FileUtils.deleteFully(tmpDir);
+        FileUtils.createDirectories(tmpDir);
+      } catch (IOException ex) {
+        LOG.warn("Can not reconstruct tmp directory under volume {}", volume,
+            ex);
+      }
+    }
+  }
+
+  /**
+   * If the diskBalancer.info file exists, load the file. If not exists,
+   * return the default config.
+   * @throws IOException
+   */
+  private void loadDiskBalancerInfo() throws IOException {
+    DiskBalancerInfo diskBalancerInfo = null;
+    try {
+      if (diskBalancerInfoFile.exists()) {
+        diskBalancerInfo = readDiskBalancerInfoFile(diskBalancerInfoFile);
+      }
+    } catch (IOException e) {
+      LOG.warn("Can not load diskBalancerInfo from diskBalancer.info file. " +
+          "Falling back to default configs", e);
+    } finally {
+      if (diskBalancerInfo == null) {
+        boolean shouldRunDefault = conf.getObject(DatanodeConfiguration.class)
+            .getDiskBalancerShouldRun();
+        diskBalancerInfo = new DiskBalancerInfo(shouldRunDefault,
+            new DiskBalancerConfiguration());
+      }
+    }
+
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo)
+      throws IOException {
+    // First store in local file, then update in memory variables
+    writeDiskBalancerInfoTo(diskBalancerInfo, diskBalancerInfoFile);
+
+    setShouldRun(diskBalancerInfo.isShouldRun());
+    setThreshold(diskBalancerInfo.getThreshold());
+    setBandwidthInMB(diskBalancerInfo.getBandwidthInMB());
+    setParallelThread(diskBalancerInfo.getParallelThread());
+
+    // Default executorService is ScheduledThreadPoolExecutor, so we can
+    // update the poll size by setting corePoolSize.
+    if ((getExecutorService() instanceof ScheduledThreadPoolExecutor)) {
+      ((ScheduledThreadPoolExecutor) getExecutorService())
+          .setCorePoolSize(parallelThread);
+    }
+  }
+
+  private String getDiskBalancerInfoPath() {
+    String diskBalancerInfoDir =
+        conf.getTrimmed(HddsConfigKeys.HDDS_DATANODE_DISK_BALANCER_INFO_DIR);
+    if (Strings.isNullOrEmpty(diskBalancerInfoDir)) {
+      File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
+      if (metaDirPath == null) {
+        // this means meta data is not found, in theory should not happen at
+        // this point because should've failed earlier.
+        throw new IllegalArgumentException("Unable to locate meta data" +
+            "directory when getting datanode disk balancer file path");
+      }
+      diskBalancerInfoDir = metaDirPath.toString();
+    }
+    // Use default datanode disk balancer file name for file path
+    return new File(diskBalancerInfoDir,
+        OzoneConsts.OZONE_SCM_DATANODE_DISK_BALANCER_INFO_DEFAULT).toString();
+  }
+
+  /**
+   * Read {@link DiskBalancerInfo} from a local info file.
+   *
+   * @param path DiskBalancerInfo file local path
+   * @return {@link DatanodeDetails}
+   * @throws IOException If the conf file is malformed or other I/O exceptions
+   */
+  private synchronized DiskBalancerInfo readDiskBalancerInfoFile(
+      File path) throws IOException {
+    if (!path.exists()) {
+      throw new IOException("DiskBalancerConf file not found.");
+    }
+    try {
+      return DiskBalancerYaml.readDiskBalancerInfoFile(path);
+    } catch (IOException e) {
+      LOG.warn("Error loading DiskBalancerInfo yaml from {}",
+          path.getAbsolutePath(), e);
+      throw new IOException("Failed to parse DiskBalancerInfo from "
+          + path.getAbsolutePath(), e);
+    }
+  }
+
+  /**
+   * Persistent a {@link DiskBalancerInfo} to a local file.
+   *
+   * @throws IOException when read/write error occurs
+   */
+  private synchronized void writeDiskBalancerInfoTo(
+      DiskBalancerInfo diskBalancerInfo, File path)
+      throws IOException {
+    if (path.exists()) {
+      if (!path.delete() || !path.createNewFile()) {
+        throw new IOException("Unable to overwrite the DiskBalancerInfo 
file.");
+      }
+    } else {
+      if (!path.getParentFile().exists() &&
+          !path.getParentFile().mkdirs()) {
+        throw new IOException("Unable to create DiskBalancerInfo 
directories.");
+      }
+    }
+    DiskBalancerYaml.createDiskBalancerInfoFile(diskBalancerInfo, path);
+  }
+
+
+
+  public void setShouldRun(boolean shouldRun) {
+    this.shouldRun = shouldRun;
+  }
+
+  public void setThreshold(double threshold) {
+    this.threshold = threshold;
+  }
+
+  public void setBandwidthInMB(long bandwidthInMB) {
+    this.bandwidthInMB = bandwidthInMB;
+  }
+
+  public void setParallelThread(int parallelThread) {
+    this.parallelThread = parallelThread;
+  }
+
+  public DiskBalancerInfo getDiskBalancerInfo() {
+    return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB,
+        parallelThread);
+  }
+
+  public DiskBalancerReportProto getDiskBalancerReportProto() {
+    DiskBalancerReportProto.Builder builder =
+        DiskBalancerReportProto.newBuilder();
+    return builder.setIsRunning(shouldRun)
+        .setBalancedBytes(totalBalancedBytes.get())
+        .setDiskBalancerConf(
+            HddsProtos.DiskBalancerConfigurationProto.newBuilder()
+                .setThreshold(threshold)
+                .setDiskBandwidthInMB(bandwidthInMB)
+                .setParallelThread(parallelThread)
+                .build())
+        .build();
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+
+    if (!shouldRun) {
+      return queue;
+    }
+    metrics.incrRunningLoopCount();
+
+    if (shouldDelay()) {
+      metrics.incrIdleLoopExceedsBandwidthCount();
+      return queue;
+    }
+
+    int availableTaskCount = parallelThread - inProgressTasks.size();
+    if (availableTaskCount <= 0) {
+      LOG.info("No available thread for disk balancer service. " +
+          "Current thread count is {}.", parallelThread);
+      return queue;
+    }
+
+    for (int i = 0; i < availableTaskCount; i++) {
+      Pair<HddsVolume, HddsVolume> pair =
+          DiskBalancerUtils.getVolumePair(volumeSet, threshold, deltaSizes);

Review Comment:
   Also can we add policy for choosing source, target volumes? Policy for 
choosing which container to move between those volumes? We can probably do it 
in a separate PR.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java:
##########
@@ -67,6 +67,9 @@ public void handle(final SCMCommand command,
                      final SCMConnectionManager connectionManager) {
     final DeleteContainerCommand deleteContainerCommand =
         (DeleteContainerCommand) command;
+    if (shouldRequeue(command, context, ozoneContainer)) {
+      context.requeueCommand(command);
+    }

Review Comment:
   Can we check if DeleteContainer request is retried from SCM? I think it is 
retried.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * A per-datanode disk balancing service takes in charge
+ * of moving contains among disks.
+ */
+public class DiskBalancerService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerService.class);
+
+  public static final String DISK_BALANCER_DIR = "diskBalancer";
+
+  private OzoneContainer ozoneContainer;
+  private final ConfigurationSource conf;
+
+  private boolean shouldRun = false;
+  private double threshold;
+  private long bandwidthInMB;
+  private int parallelThread;
+
+  private AtomicLong totalBalancedBytes = new AtomicLong(0L);
+  private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L);
+  private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
+
+  private Map<DiskBalancerTask, Integer> inProgressTasks;
+  private Set<Long> inProgressContainers;
+  private Map<HddsVolume, Long> deltaSizes;
+  private MutableVolumeSet volumeSet;
+
+  private final File diskBalancerInfoFile;
+
+  private DiskBalancerServiceMetrics metrics;
+
+  public DiskBalancerService(OzoneContainer ozoneContainer,
+      long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit,
+      int workerSize, ConfigurationSource conf) throws IOException {
+    super("DiskBalancerService", serviceCheckInterval, timeUnit, workerSize,
+        serviceCheckTimeout);
+    this.ozoneContainer = ozoneContainer;
+    this.conf = conf;
+
+    String diskBalancerInfoPath = getDiskBalancerInfoPath();
+    Preconditions.checkNotNull(diskBalancerInfoPath);
+    diskBalancerInfoFile = new File(diskBalancerInfoPath);
+
+    inProgressTasks = new ConcurrentHashMap<>();
+    inProgressContainers = ConcurrentHashMap.newKeySet();
+    deltaSizes = new ConcurrentHashMap<>();
+    volumeSet = ozoneContainer.getVolumeSet();
+
+    metrics = DiskBalancerServiceMetrics.create();
+
+    loadDiskBalancerInfo();
+
+    constructTmpDir();
+  }
+
+  /**
+   * Update DiskBalancerService based on new DiskBalancerInfo.
+   * @param diskBalancerInfo
+   * @throws IOException
+   */
+  public void refresh(DiskBalancerInfo diskBalancerInfo) throws IOException {
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void constructTmpDir() {
+    for (HddsVolume volume:
+        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) {
+      Path tmpDir = getDiskBalancerTmpDir(volume);
+      try {
+        FileUtils.deleteFully(tmpDir);
+        FileUtils.createDirectories(tmpDir);
+      } catch (IOException ex) {
+        LOG.warn("Can not reconstruct tmp directory under volume {}", volume,
+            ex);
+      }
+    }
+  }
+
+  /**
+   * If the diskBalancer.info file exists, load the file. If not exists,
+   * return the default config.
+   * @throws IOException
+   */
+  private void loadDiskBalancerInfo() throws IOException {
+    DiskBalancerInfo diskBalancerInfo = null;
+    try {
+      if (diskBalancerInfoFile.exists()) {
+        diskBalancerInfo = readDiskBalancerInfoFile(diskBalancerInfoFile);
+      }
+    } catch (IOException e) {
+      LOG.warn("Can not load diskBalancerInfo from diskBalancer.info file. " +
+          "Falling back to default configs", e);
+    } finally {
+      if (diskBalancerInfo == null) {
+        boolean shouldRunDefault = conf.getObject(DatanodeConfiguration.class)
+            .getDiskBalancerShouldRun();
+        diskBalancerInfo = new DiskBalancerInfo(shouldRunDefault,
+            new DiskBalancerConfiguration());
+      }
+    }
+
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo)
+      throws IOException {
+    // First store in local file, then update in memory variables
+    writeDiskBalancerInfoTo(diskBalancerInfo, diskBalancerInfoFile);
+
+    setShouldRun(diskBalancerInfo.isShouldRun());
+    setThreshold(diskBalancerInfo.getThreshold());
+    setBandwidthInMB(diskBalancerInfo.getBandwidthInMB());
+    setParallelThread(diskBalancerInfo.getParallelThread());
+
+    // Default executorService is ScheduledThreadPoolExecutor, so we can
+    // update the poll size by setting corePoolSize.
+    if ((getExecutorService() instanceof ScheduledThreadPoolExecutor)) {
+      ((ScheduledThreadPoolExecutor) getExecutorService())
+          .setCorePoolSize(parallelThread);
+    }
+  }
+
+  private String getDiskBalancerInfoPath() {
+    String diskBalancerInfoDir =
+        conf.getTrimmed(HddsConfigKeys.HDDS_DATANODE_DISK_BALANCER_INFO_DIR);
+    if (Strings.isNullOrEmpty(diskBalancerInfoDir)) {
+      File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
+      if (metaDirPath == null) {
+        // this means meta data is not found, in theory should not happen at
+        // this point because should've failed earlier.
+        throw new IllegalArgumentException("Unable to locate meta data" +
+            "directory when getting datanode disk balancer file path");
+      }
+      diskBalancerInfoDir = metaDirPath.toString();
+    }
+    // Use default datanode disk balancer file name for file path
+    return new File(diskBalancerInfoDir,
+        OzoneConsts.OZONE_SCM_DATANODE_DISK_BALANCER_INFO_DEFAULT).toString();
+  }
+
+  /**
+   * Read {@link DiskBalancerInfo} from a local info file.
+   *
+   * @param path DiskBalancerInfo file local path
+   * @return {@link DatanodeDetails}
+   * @throws IOException If the conf file is malformed or other I/O exceptions
+   */
+  private synchronized DiskBalancerInfo readDiskBalancerInfoFile(
+      File path) throws IOException {
+    if (!path.exists()) {
+      throw new IOException("DiskBalancerConf file not found.");
+    }
+    try {
+      return DiskBalancerYaml.readDiskBalancerInfoFile(path);
+    } catch (IOException e) {
+      LOG.warn("Error loading DiskBalancerInfo yaml from {}",
+          path.getAbsolutePath(), e);
+      throw new IOException("Failed to parse DiskBalancerInfo from "
+          + path.getAbsolutePath(), e);
+    }
+  }
+
+  /**
+   * Persistent a {@link DiskBalancerInfo} to a local file.
+   *
+   * @throws IOException when read/write error occurs
+   */
+  private synchronized void writeDiskBalancerInfoTo(
+      DiskBalancerInfo diskBalancerInfo, File path)
+      throws IOException {
+    if (path.exists()) {
+      if (!path.delete() || !path.createNewFile()) {
+        throw new IOException("Unable to overwrite the DiskBalancerInfo 
file.");
+      }
+    } else {
+      if (!path.getParentFile().exists() &&
+          !path.getParentFile().mkdirs()) {
+        throw new IOException("Unable to create DiskBalancerInfo 
directories.");
+      }
+    }
+    DiskBalancerYaml.createDiskBalancerInfoFile(diskBalancerInfo, path);
+  }
+
+
+
+  public void setShouldRun(boolean shouldRun) {
+    this.shouldRun = shouldRun;
+  }
+
+  public void setThreshold(double threshold) {
+    this.threshold = threshold;
+  }
+
+  public void setBandwidthInMB(long bandwidthInMB) {
+    this.bandwidthInMB = bandwidthInMB;
+  }
+
+  public void setParallelThread(int parallelThread) {
+    this.parallelThread = parallelThread;
+  }
+
+  public DiskBalancerInfo getDiskBalancerInfo() {
+    return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB,
+        parallelThread);
+  }
+
+  public DiskBalancerReportProto getDiskBalancerReportProto() {
+    DiskBalancerReportProto.Builder builder =
+        DiskBalancerReportProto.newBuilder();
+    return builder.setIsRunning(shouldRun)
+        .setBalancedBytes(totalBalancedBytes.get())
+        .setDiskBalancerConf(
+            HddsProtos.DiskBalancerConfigurationProto.newBuilder()
+                .setThreshold(threshold)
+                .setDiskBandwidthInMB(bandwidthInMB)
+                .setParallelThread(parallelThread)
+                .build())
+        .build();
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+
+    if (!shouldRun) {
+      return queue;
+    }
+    metrics.incrRunningLoopCount();
+
+    if (shouldDelay()) {
+      metrics.incrIdleLoopExceedsBandwidthCount();
+      return queue;
+    }
+
+    int availableTaskCount = parallelThread - inProgressTasks.size();
+    if (availableTaskCount <= 0) {
+      LOG.info("No available thread for disk balancer service. " +
+          "Current thread count is {}.", parallelThread);
+      return queue;
+    }
+
+    for (int i = 0; i < availableTaskCount; i++) {
+      Pair<HddsVolume, HddsVolume> pair =
+          DiskBalancerUtils.getVolumePair(volumeSet, threshold, deltaSizes);
+      if (pair == null) {
+        continue;
+      }
+      HddsVolume sourceVolume = pair.getLeft(), destVolume = pair.getRight();
+      Iterator<Container<?>> itr = ozoneContainer.getController()
+          .getContainers(sourceVolume);
+      while (itr.hasNext()) {

Review Comment:
   It would be better to have one task per pair of volumes.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * A per-datanode disk balancing service takes in charge
+ * of moving contains among disks.
+ */
+public class DiskBalancerService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerService.class);
+
+  public static final String DISK_BALANCER_DIR = "diskBalancer";
+
+  private OzoneContainer ozoneContainer;
+  private final ConfigurationSource conf;
+
+  private boolean shouldRun = false;
+  private double threshold;
+  private long bandwidthInMB;
+  private int parallelThread;
+
+  private AtomicLong totalBalancedBytes = new AtomicLong(0L);
+  private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L);
+  private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
+
+  private Map<DiskBalancerTask, Integer> inProgressTasks;
+  private Set<Long> inProgressContainers;
+  private Map<HddsVolume, Long> deltaSizes;
+  private MutableVolumeSet volumeSet;
+
+  private final File diskBalancerInfoFile;
+
+  private DiskBalancerServiceMetrics metrics;
+
+  public DiskBalancerService(OzoneContainer ozoneContainer,
+      long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit,
+      int workerSize, ConfigurationSource conf) throws IOException {
+    super("DiskBalancerService", serviceCheckInterval, timeUnit, workerSize,
+        serviceCheckTimeout);
+    this.ozoneContainer = ozoneContainer;
+    this.conf = conf;
+
+    String diskBalancerInfoPath = getDiskBalancerInfoPath();
+    Preconditions.checkNotNull(diskBalancerInfoPath);
+    diskBalancerInfoFile = new File(diskBalancerInfoPath);
+
+    inProgressTasks = new ConcurrentHashMap<>();
+    inProgressContainers = ConcurrentHashMap.newKeySet();
+    deltaSizes = new ConcurrentHashMap<>();
+    volumeSet = ozoneContainer.getVolumeSet();
+
+    metrics = DiskBalancerServiceMetrics.create();
+
+    loadDiskBalancerInfo();
+
+    constructTmpDir();
+  }
+
+  /**
+   * Update DiskBalancerService based on new DiskBalancerInfo.
+   * @param diskBalancerInfo
+   * @throws IOException
+   */
+  public void refresh(DiskBalancerInfo diskBalancerInfo) throws IOException {
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void constructTmpDir() {
+    for (HddsVolume volume:
+        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) {
+      Path tmpDir = getDiskBalancerTmpDir(volume);
+      try {
+        FileUtils.deleteFully(tmpDir);
+        FileUtils.createDirectories(tmpDir);
+      } catch (IOException ex) {
+        LOG.warn("Can not reconstruct tmp directory under volume {}", volume,
+            ex);
+      }
+    }
+  }
+
+  /**
+   * If the diskBalancer.info file exists, load the file. If not exists,
+   * return the default config.
+   * @throws IOException
+   */
+  private void loadDiskBalancerInfo() throws IOException {
+    DiskBalancerInfo diskBalancerInfo = null;
+    try {
+      if (diskBalancerInfoFile.exists()) {
+        diskBalancerInfo = readDiskBalancerInfoFile(diskBalancerInfoFile);
+      }
+    } catch (IOException e) {
+      LOG.warn("Can not load diskBalancerInfo from diskBalancer.info file. " +
+          "Falling back to default configs", e);
+    } finally {
+      if (diskBalancerInfo == null) {
+        boolean shouldRunDefault = conf.getObject(DatanodeConfiguration.class)
+            .getDiskBalancerShouldRun();
+        diskBalancerInfo = new DiskBalancerInfo(shouldRunDefault,
+            new DiskBalancerConfiguration());
+      }
+    }
+
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo)
+      throws IOException {
+    // First store in local file, then update in memory variables
+    writeDiskBalancerInfoTo(diskBalancerInfo, diskBalancerInfoFile);
+
+    setShouldRun(diskBalancerInfo.isShouldRun());
+    setThreshold(diskBalancerInfo.getThreshold());
+    setBandwidthInMB(diskBalancerInfo.getBandwidthInMB());
+    setParallelThread(diskBalancerInfo.getParallelThread());
+
+    // Default executorService is ScheduledThreadPoolExecutor, so we can
+    // update the poll size by setting corePoolSize.
+    if ((getExecutorService() instanceof ScheduledThreadPoolExecutor)) {
+      ((ScheduledThreadPoolExecutor) getExecutorService())
+          .setCorePoolSize(parallelThread);
+    }
+  }
+
+  private String getDiskBalancerInfoPath() {
+    String diskBalancerInfoDir =
+        conf.getTrimmed(HddsConfigKeys.HDDS_DATANODE_DISK_BALANCER_INFO_DIR);
+    if (Strings.isNullOrEmpty(diskBalancerInfoDir)) {
+      File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
+      if (metaDirPath == null) {
+        // this means meta data is not found, in theory should not happen at
+        // this point because should've failed earlier.
+        throw new IllegalArgumentException("Unable to locate meta data" +
+            "directory when getting datanode disk balancer file path");
+      }
+      diskBalancerInfoDir = metaDirPath.toString();
+    }
+    // Use default datanode disk balancer file name for file path
+    return new File(diskBalancerInfoDir,
+        OzoneConsts.OZONE_SCM_DATANODE_DISK_BALANCER_INFO_DEFAULT).toString();
+  }
+
+  /**
+   * Read {@link DiskBalancerInfo} from a local info file.
+   *
+   * @param path DiskBalancerInfo file local path
+   * @return {@link DatanodeDetails}
+   * @throws IOException If the conf file is malformed or other I/O exceptions
+   */
+  private synchronized DiskBalancerInfo readDiskBalancerInfoFile(
+      File path) throws IOException {
+    if (!path.exists()) {
+      throw new IOException("DiskBalancerConf file not found.");
+    }
+    try {
+      return DiskBalancerYaml.readDiskBalancerInfoFile(path);
+    } catch (IOException e) {
+      LOG.warn("Error loading DiskBalancerInfo yaml from {}",
+          path.getAbsolutePath(), e);
+      throw new IOException("Failed to parse DiskBalancerInfo from "
+          + path.getAbsolutePath(), e);
+    }
+  }
+
+  /**
+   * Persistent a {@link DiskBalancerInfo} to a local file.
+   *
+   * @throws IOException when read/write error occurs
+   */
+  private synchronized void writeDiskBalancerInfoTo(
+      DiskBalancerInfo diskBalancerInfo, File path)
+      throws IOException {
+    if (path.exists()) {
+      if (!path.delete() || !path.createNewFile()) {
+        throw new IOException("Unable to overwrite the DiskBalancerInfo 
file.");
+      }
+    } else {
+      if (!path.getParentFile().exists() &&
+          !path.getParentFile().mkdirs()) {
+        throw new IOException("Unable to create DiskBalancerInfo 
directories.");
+      }
+    }
+    DiskBalancerYaml.createDiskBalancerInfoFile(diskBalancerInfo, path);
+  }
+
+
+
+  public void setShouldRun(boolean shouldRun) {
+    this.shouldRun = shouldRun;
+  }
+
+  public void setThreshold(double threshold) {
+    this.threshold = threshold;
+  }
+
+  public void setBandwidthInMB(long bandwidthInMB) {
+    this.bandwidthInMB = bandwidthInMB;
+  }
+
+  public void setParallelThread(int parallelThread) {
+    this.parallelThread = parallelThread;
+  }
+
+  public DiskBalancerInfo getDiskBalancerInfo() {
+    return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB,
+        parallelThread);
+  }
+
+  public DiskBalancerReportProto getDiskBalancerReportProto() {
+    DiskBalancerReportProto.Builder builder =
+        DiskBalancerReportProto.newBuilder();
+    return builder.setIsRunning(shouldRun)
+        .setBalancedBytes(totalBalancedBytes.get())
+        .setDiskBalancerConf(
+            HddsProtos.DiskBalancerConfigurationProto.newBuilder()
+                .setThreshold(threshold)
+                .setDiskBandwidthInMB(bandwidthInMB)
+                .setParallelThread(parallelThread)
+                .build())
+        .build();
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+
+    if (!shouldRun) {
+      return queue;
+    }
+    metrics.incrRunningLoopCount();
+
+    if (shouldDelay()) {
+      metrics.incrIdleLoopExceedsBandwidthCount();
+      return queue;
+    }
+
+    int availableTaskCount = parallelThread - inProgressTasks.size();
+    if (availableTaskCount <= 0) {
+      LOG.info("No available thread for disk balancer service. " +
+          "Current thread count is {}.", parallelThread);
+      return queue;
+    }
+
+    for (int i = 0; i < availableTaskCount; i++) {
+      Pair<HddsVolume, HddsVolume> pair =
+          DiskBalancerUtils.getVolumePair(volumeSet, threshold, deltaSizes);
+      if (pair == null) {
+        continue;
+      }
+      HddsVolume sourceVolume = pair.getLeft(), destVolume = pair.getRight();
+      Iterator<Container<?>> itr = ozoneContainer.getController()
+          .getContainers(sourceVolume);
+      while (itr.hasNext()) {

Review Comment:
   There should be a limit on how many containers/data we want to move per 
volume in a single interval.
   Also we do not want parallel tasks trying to move containers between same 
pair of volumes.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * A per-datanode disk balancing service takes in charge
+ * of moving contains among disks.
+ */
+public class DiskBalancerService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerService.class);
+
+  public static final String DISK_BALANCER_DIR = "diskBalancer";
+
+  private OzoneContainer ozoneContainer;
+  private final ConfigurationSource conf;
+
+  private boolean shouldRun = false;
+  private double threshold;
+  private long bandwidthInMB;
+  private int parallelThread;
+
+  private AtomicLong totalBalancedBytes = new AtomicLong(0L);
+  private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L);
+  private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
+
+  private Map<DiskBalancerTask, Integer> inProgressTasks;
+  private Set<Long> inProgressContainers;
+  private Map<HddsVolume, Long> deltaSizes;
+  private MutableVolumeSet volumeSet;
+
+  private final File diskBalancerInfoFile;
+
+  private DiskBalancerServiceMetrics metrics;
+
+  public DiskBalancerService(OzoneContainer ozoneContainer,
+      long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit,
+      int workerSize, ConfigurationSource conf) throws IOException {
+    super("DiskBalancerService", serviceCheckInterval, timeUnit, workerSize,
+        serviceCheckTimeout);
+    this.ozoneContainer = ozoneContainer;
+    this.conf = conf;
+
+    String diskBalancerInfoPath = getDiskBalancerInfoPath();
+    Preconditions.checkNotNull(diskBalancerInfoPath);
+    diskBalancerInfoFile = new File(diskBalancerInfoPath);
+
+    inProgressTasks = new ConcurrentHashMap<>();
+    inProgressContainers = ConcurrentHashMap.newKeySet();
+    deltaSizes = new ConcurrentHashMap<>();
+    volumeSet = ozoneContainer.getVolumeSet();
+
+    metrics = DiskBalancerServiceMetrics.create();
+
+    loadDiskBalancerInfo();
+
+    constructTmpDir();
+  }
+
+  /**
+   * Update DiskBalancerService based on new DiskBalancerInfo.
+   * @param diskBalancerInfo
+   * @throws IOException
+   */
+  public void refresh(DiskBalancerInfo diskBalancerInfo) throws IOException {
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void constructTmpDir() {
+    for (HddsVolume volume:
+        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) {
+      Path tmpDir = getDiskBalancerTmpDir(volume);
+      try {
+        FileUtils.deleteFully(tmpDir);
+        FileUtils.createDirectories(tmpDir);
+      } catch (IOException ex) {
+        LOG.warn("Can not reconstruct tmp directory under volume {}", volume,
+            ex);
+      }
+    }
+  }
+
+  /**
+   * If the diskBalancer.info file exists, load the file. If not exists,
+   * return the default config.
+   * @throws IOException
+   */
+  private void loadDiskBalancerInfo() throws IOException {
+    DiskBalancerInfo diskBalancerInfo = null;
+    try {
+      if (diskBalancerInfoFile.exists()) {
+        diskBalancerInfo = readDiskBalancerInfoFile(diskBalancerInfoFile);
+      }
+    } catch (IOException e) {
+      LOG.warn("Can not load diskBalancerInfo from diskBalancer.info file. " +
+          "Falling back to default configs", e);
+    } finally {
+      if (diskBalancerInfo == null) {
+        boolean shouldRunDefault = conf.getObject(DatanodeConfiguration.class)
+            .getDiskBalancerShouldRun();
+        diskBalancerInfo = new DiskBalancerInfo(shouldRunDefault,
+            new DiskBalancerConfiguration());
+      }
+    }
+
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo)
+      throws IOException {
+    // First store in local file, then update in memory variables
+    writeDiskBalancerInfoTo(diskBalancerInfo, diskBalancerInfoFile);
+
+    setShouldRun(diskBalancerInfo.isShouldRun());
+    setThreshold(diskBalancerInfo.getThreshold());
+    setBandwidthInMB(diskBalancerInfo.getBandwidthInMB());
+    setParallelThread(diskBalancerInfo.getParallelThread());
+
+    // Default executorService is ScheduledThreadPoolExecutor, so we can
+    // update the poll size by setting corePoolSize.
+    if ((getExecutorService() instanceof ScheduledThreadPoolExecutor)) {
+      ((ScheduledThreadPoolExecutor) getExecutorService())
+          .setCorePoolSize(parallelThread);
+    }
+  }
+
+  private String getDiskBalancerInfoPath() {
+    String diskBalancerInfoDir =
+        conf.getTrimmed(HddsConfigKeys.HDDS_DATANODE_DISK_BALANCER_INFO_DIR);
+    if (Strings.isNullOrEmpty(diskBalancerInfoDir)) {
+      File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
+      if (metaDirPath == null) {
+        // this means meta data is not found, in theory should not happen at
+        // this point because should've failed earlier.
+        throw new IllegalArgumentException("Unable to locate meta data" +
+            "directory when getting datanode disk balancer file path");
+      }
+      diskBalancerInfoDir = metaDirPath.toString();
+    }
+    // Use default datanode disk balancer file name for file path
+    return new File(diskBalancerInfoDir,
+        OzoneConsts.OZONE_SCM_DATANODE_DISK_BALANCER_INFO_DEFAULT).toString();
+  }
+
+  /**
+   * Read {@link DiskBalancerInfo} from a local info file.
+   *
+   * @param path DiskBalancerInfo file local path
+   * @return {@link DatanodeDetails}
+   * @throws IOException If the conf file is malformed or other I/O exceptions
+   */
+  private synchronized DiskBalancerInfo readDiskBalancerInfoFile(
+      File path) throws IOException {
+    if (!path.exists()) {
+      throw new IOException("DiskBalancerConf file not found.");
+    }
+    try {
+      return DiskBalancerYaml.readDiskBalancerInfoFile(path);
+    } catch (IOException e) {
+      LOG.warn("Error loading DiskBalancerInfo yaml from {}",
+          path.getAbsolutePath(), e);
+      throw new IOException("Failed to parse DiskBalancerInfo from "
+          + path.getAbsolutePath(), e);
+    }
+  }
+
+  /**
+   * Persistent a {@link DiskBalancerInfo} to a local file.
+   *
+   * @throws IOException when read/write error occurs
+   */
+  private synchronized void writeDiskBalancerInfoTo(
+      DiskBalancerInfo diskBalancerInfo, File path)
+      throws IOException {
+    if (path.exists()) {
+      if (!path.delete() || !path.createNewFile()) {
+        throw new IOException("Unable to overwrite the DiskBalancerInfo 
file.");
+      }
+    } else {
+      if (!path.getParentFile().exists() &&
+          !path.getParentFile().mkdirs()) {
+        throw new IOException("Unable to create DiskBalancerInfo 
directories.");
+      }
+    }
+    DiskBalancerYaml.createDiskBalancerInfoFile(diskBalancerInfo, path);
+  }
+
+
+
+  public void setShouldRun(boolean shouldRun) {
+    this.shouldRun = shouldRun;
+  }
+
+  public void setThreshold(double threshold) {
+    this.threshold = threshold;
+  }
+
+  public void setBandwidthInMB(long bandwidthInMB) {
+    this.bandwidthInMB = bandwidthInMB;
+  }
+
+  public void setParallelThread(int parallelThread) {
+    this.parallelThread = parallelThread;
+  }
+
+  public DiskBalancerInfo getDiskBalancerInfo() {
+    return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB,
+        parallelThread);
+  }
+
+  public DiskBalancerReportProto getDiskBalancerReportProto() {
+    DiskBalancerReportProto.Builder builder =
+        DiskBalancerReportProto.newBuilder();
+    return builder.setIsRunning(shouldRun)
+        .setBalancedBytes(totalBalancedBytes.get())
+        .setDiskBalancerConf(
+            HddsProtos.DiskBalancerConfigurationProto.newBuilder()
+                .setThreshold(threshold)
+                .setDiskBandwidthInMB(bandwidthInMB)
+                .setParallelThread(parallelThread)
+                .build())
+        .build();
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+
+    if (!shouldRun) {
+      return queue;
+    }
+    metrics.incrRunningLoopCount();
+
+    if (shouldDelay()) {
+      metrics.incrIdleLoopExceedsBandwidthCount();
+      return queue;
+    }
+
+    int availableTaskCount = parallelThread - inProgressTasks.size();
+    if (availableTaskCount <= 0) {
+      LOG.info("No available thread for disk balancer service. " +
+          "Current thread count is {}.", parallelThread);
+      return queue;
+    }
+
+    for (int i = 0; i < availableTaskCount; i++) {
+      Pair<HddsVolume, HddsVolume> pair =
+          DiskBalancerUtils.getVolumePair(volumeSet, threshold, deltaSizes);
+      if (pair == null) {
+        continue;
+      }
+      HddsVolume sourceVolume = pair.getLeft(), destVolume = pair.getRight();
+      Iterator<Container<?>> itr = ozoneContainer.getController()
+          .getContainers(sourceVolume);
+      while (itr.hasNext()) {
+        ContainerData containerData = itr.next().getContainerData();
+        if (!inProgressContainers.contains(
+            containerData.getContainerID()) && containerData.isClosed()) {
+          queue.add(new DiskBalancerTask(containerData, sourceVolume,
+              destVolume));
+          inProgressContainers.add(containerData.getContainerID());
+          deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 
0L)
+              - containerData.getBytesUsed());
+          deltaSizes.put(destVolume, deltaSizes.getOrDefault(destVolume, 0L)
+              + containerData.getBytesUsed());
+          break;
+        }
+      }
+    }
+    if (queue.isEmpty()) {
+      metrics.incrIdleLoopNoAvailableVolumePairCount();
+    }
+    return queue;
+  }
+
+  private boolean shouldDelay() {
+    // We should wait for next AvailableTime.
+    if (Time.monotonicNow() <= nextAvailableTime.get()) {
+      return true;
+    }
+    // Calculate the next AvailableTime based on bandwidth
+    long bytesBalanced = balancedBytesInLastWindow.getAndSet(0L);
+
+    final int megaByte = 1024 * 1024;
+
+    // converting disk bandwidth in byte/millisec
+    float bytesPerMillisec = bandwidthInMB * megaByte / 1000f;
+    nextAvailableTime.set(Time.monotonicNow() +
+        ((long) (bytesBalanced / bytesPerMillisec)));
+    return false;
+  }
+
+  private class DiskBalancerTask implements BackgroundTask {
+
+    private HddsVolume sourceVolume;
+    private HddsVolume destVolume;
+    private ContainerData containerData;
+    private long containerId;
+
+    DiskBalancerTask(ContainerData containerData,
+        HddsVolume sourceVolume, HddsVolume destVolume) {
+      this.containerData = containerData;
+      this.sourceVolume = sourceVolume;
+      this.destVolume = destVolume;
+      this.containerId = containerData.getContainerID();
+    }
+
+    @Override
+    public BackgroundTaskResult call() {

Review Comment:
   Can we simplify this function and probably move it to utils or make it part 
of KeyValueContainer.. class itself? I think we should just have an API call 
here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to