This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 86ccc48495 HDDS-11284. refactor quota repair non-blocking while 
upgrade (#7035)
86ccc48495 is described below

commit 86ccc48495cf887e0e9ef302d1ba70bf06b22d98
Author: Sumit Agrawal <[email protected]>
AuthorDate: Wed Aug 21 12:50:19 2024 +0530

    HDDS-11284. refactor quota repair non-blocking while upgrade (#7035)
---
 .../org/apache/hadoop/hdds/utils/db/DBStore.java   |   8 +
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |   8 +
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   1 +
 .../src/main/proto/OmClientProtocol.proto          |  21 +-
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   3 +
 .../om/request/volume/OMQuotaRepairRequest.java    | 191 ++++++++++
 .../om/response/volume/OMQuotaRepairResponse.java  |  72 ++++
 .../hadoop/ozone/om/service/QuotaRepairTask.java   | 401 +++++++++++++--------
 .../ozone/om/upgrade/QuotaRepairUpgradeAction.java |  27 +-
 .../ozone/om/service/TestQuotaRepairTask.java      |  68 +++-
 10 files changed, 633 insertions(+), 167 deletions(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
index 1985562523..3e8ea30a65 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
@@ -167,6 +167,14 @@ public interface DBStore extends Closeable, 
BatchOperationHandler {
    */
   DBCheckpoint getCheckpoint(boolean flush) throws IOException;
 
+  /**
+   * Get current snapshot of DB store as an artifact stored on
+   * the local filesystem with different parent path.
+   * @return An object that encapsulates the checkpoint information along with
+   * location.
+   */
+  DBCheckpoint getCheckpoint(String parentDir, boolean flush) throws 
IOException;
+
   /**
    * Get DB Store location.
    * @return DB file location.
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index d5aa961b0e..99924f724d 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -338,6 +338,14 @@ public class RDBStore implements DBStore {
     return checkPointManager.createCheckpoint(checkpointsParentDir);
   }
 
+  @Override
+  public DBCheckpoint getCheckpoint(String parentPath, boolean flush) throws 
IOException {
+    if (flush) {
+      this.flushDB();
+    }
+    return checkPointManager.createCheckpoint(parentPath, null);
+  }
+
   public DBCheckpoint getSnapshot(String name) throws IOException {
     this.flushLog(true);
     return checkPointManager.createCheckpoint(snapshotsParentDir, name);
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 82030669c9..bf27d7afb6 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -327,6 +327,7 @@ public final class OmUtils {
     case SetTimes:
     case AbortExpiredMultiPartUploads:
     case SetSnapshotProperty:
+    case QuotaRepair:
     case UnknownCommand:
       return false;
     case EchoRPC:
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 32bba26608..4bdfa97b93 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -147,8 +147,8 @@ enum Type {
   ListStatusLight = 129;
   GetSnapshotInfo = 130;
   RenameSnapshot = 131;
-
   ListOpenFiles = 132;
+  QuotaRepair = 133;
 }
 
 enum SafeMode {
@@ -285,8 +285,8 @@ message OMRequest {
   optional SetSnapshotPropertyRequest       SetSnapshotPropertyRequest     = 
127;
   optional SnapshotInfoRequest              SnapshotInfoRequest            = 
128;
   optional RenameSnapshotRequest            RenameSnapshotRequest          = 
129;
-
   optional ListOpenFilesRequest             ListOpenFilesRequest           = 
130;
+  optional QuotaRepairRequest               QuotaRepairRequest             = 
131;
 }
 
 message OMResponse {
@@ -410,8 +410,8 @@ message OMResponse {
   optional SnapshotInfoResponse              SnapshotInfoResponse          = 
130;
   optional OMLockDetailsProto                omLockDetails                 = 
131;
   optional RenameSnapshotResponse            RenameSnapshotResponse        = 
132;
-
   optional ListOpenFilesResponse             ListOpenFilesResponse         = 
133;
+  optional QuotaRepairResponse            QuotaRepairResponse        = 134;
 }
 
 enum Status {
@@ -2187,6 +2187,21 @@ message SetSafeModeResponse {
   optional bool response = 1;
 }
 
+message QuotaRepairRequest {
+    repeated BucketQuotaCount bucketCount = 1;
+    required bool supportVolumeOldQuota = 2 [default=false];
+}
+message BucketQuotaCount {
+    required string volName = 1;
+    required string bucketName = 2;
+    required int64 diffUsedBytes = 3;
+    required int64 diffUsedNamespace = 4;
+    required bool supportOldQuota = 5 [default=false];
+}
+
+message QuotaRepairResponse {
+}
+
 message OMLockDetailsProto {
   optional bool isLockAcquired = 1;
   optional uint64 waitLockNanos    = 2;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 8ff59e091d..5dc640c742 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -84,6 +84,7 @@ import 
org.apache.hadoop.ozone.om.request.upgrade.OMCancelPrepareRequest;
 import org.apache.hadoop.ozone.om.request.upgrade.OMFinalizeUpgradeRequest;
 import org.apache.hadoop.ozone.om.request.upgrade.OMPrepareRequest;
 import org.apache.hadoop.ozone.om.request.util.OMEchoRPCWriteRequest;
+import org.apache.hadoop.ozone.om.request.volume.OMQuotaRepairRequest;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeDeleteRequest;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetOwnerRequest;
@@ -331,6 +332,8 @@ public final class OzoneManagerRatisUtils {
       return new OMEchoRPCWriteRequest(omRequest);
     case AbortExpiredMultiPartUploads:
       return new S3ExpiredMultipartUploadsAbortRequest(omRequest);
+    case QuotaRepair:
+      return new OMQuotaRepairRequest(omRequest);
     default:
       throw new OMException("Unrecognized write command type request "
           + cmdType, OMException.ResultCodes.INVALID_REQUEST);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java
new file mode 100644
index 0000000000..e307a1f95f
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.volume;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.volume.OMQuotaRepairResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OLD_QUOTA_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.QUOTA_RESET;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Handle OMQuotaRepairRequest Request.
+ */
+public class OMQuotaRepairRequest extends OMClientRequest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMQuotaRepairRequest.class);
+
+  public OMQuotaRepairRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    UserGroupInformation ugi = createUGIForApi();
+    if (ozoneManager.getAclsEnabled() && !ozoneManager.isAdmin(ugi)) {
+      throw new OMException("Access denied for user " + ugi + ". Admin 
privilege is required for quota repair.",
+          OMException.ResultCodes.ACCESS_DENIED);
+    }
+    return super.preExecute(ozoneManager);
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, 
TermIndex termIndex) {
+    final long transactionLogIndex = termIndex.getIndex();
+    OzoneManagerProtocolProtos.QuotaRepairRequest quotaRepairRequest =
+        getOmRequest().getQuotaRepairRequest();
+    Preconditions.checkNotNull(quotaRepairRequest);
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    OzoneManagerProtocolProtos.OMResponse.Builder omResponse = 
OmResponseUtil.getOMResponseBuilder(getOmRequest());
+    Map<Pair<String, String>, OmBucketInfo> bucketMap = new HashMap<>();
+    OMClientResponse omClientResponse = null;
+    try {
+      for (int i = 0; i < quotaRepairRequest.getBucketCountCount(); ++i) {
+        OzoneManagerProtocolProtos.BucketQuotaCount bucketCountInfo = 
quotaRepairRequest.getBucketCount(i);
+        updateBucketInfo(omMetadataManager, bucketCountInfo, 
transactionLogIndex, bucketMap);
+      }
+      Map<String, OmVolumeArgs> volUpdateMap;
+      if (quotaRepairRequest.getSupportVolumeOldQuota()) {
+        volUpdateMap = updateOldVolumeQuotaSupport(omMetadataManager, 
transactionLogIndex);
+      } else {
+        volUpdateMap = Collections.emptyMap();
+      }
+      omResponse.setQuotaRepairResponse(
+          OzoneManagerProtocolProtos.QuotaRepairResponse.newBuilder().build());
+      omClientResponse = new OMQuotaRepairResponse(omResponse.build(), 
volUpdateMap, bucketMap);
+    } catch (IOException ex) {
+      LOG.error("failed to update repair count", ex);
+      omClientResponse = new 
OMQuotaRepairResponse(createErrorOMResponse(omResponse, ex));
+    } finally {
+      if (omClientResponse != null) {
+        omClientResponse.setOmLockDetails(getOmLockDetails());
+      }
+    }
+
+    return omClientResponse;
+  }
+  
+  private void updateBucketInfo(
+      OMMetadataManager omMetadataManager, 
OzoneManagerProtocolProtos.BucketQuotaCount bucketCountInfo,
+      long transactionLogIndex, Map<Pair<String, String>, OmBucketInfo> 
bucketMap) throws IOException {
+    // acquire lock.
+    mergeOmLockDetails(omMetadataManager.getLock().acquireWriteLock(
+        BUCKET_LOCK, bucketCountInfo.getVolName(), 
bucketCountInfo.getBucketName()));
+    boolean acquiredBucketLock = getOmLockDetails().isLockAcquired();
+    try {
+      String bucketKey = 
omMetadataManager.getBucketKey(bucketCountInfo.getVolName(),
+          bucketCountInfo.getBucketName());
+      OmBucketInfo bucketInfo = 
omMetadataManager.getBucketTable().get(bucketKey);
+      if (null == bucketInfo) {
+        // bucket might be deleted when running repair count parallel
+        return;
+      }
+      bucketInfo.incrUsedBytes(bucketCountInfo.getDiffUsedBytes());
+      bucketInfo.incrUsedNamespace(bucketCountInfo.getDiffUsedNamespace());
+      if (bucketCountInfo.getSupportOldQuota()) {
+        OmBucketInfo.Builder builder = bucketInfo.toBuilder();
+        if (bucketInfo.getQuotaInBytes() == OLD_QUOTA_DEFAULT) {
+          builder.setQuotaInBytes(QUOTA_RESET);
+        }
+        if (bucketInfo.getQuotaInNamespace() == OLD_QUOTA_DEFAULT) {
+          builder.setQuotaInNamespace(QUOTA_RESET);
+        }
+        bucketInfo = builder.build();
+      }
+
+      omMetadataManager.getBucketTable().addCacheEntry(
+          new CacheKey<>(bucketKey), CacheValue.get(transactionLogIndex, 
bucketInfo));
+      bucketMap.put(Pair.of(bucketCountInfo.getVolName(), 
bucketCountInfo.getBucketName()), bucketInfo);
+    } finally {
+      if (acquiredBucketLock) {
+        mergeOmLockDetails(omMetadataManager.getLock()
+            .releaseWriteLock(BUCKET_LOCK, bucketCountInfo.getVolName(), 
bucketCountInfo.getBucketName()));
+      }
+    }
+  }
+
+  private Map<String, OmVolumeArgs> updateOldVolumeQuotaSupport(
+      OMMetadataManager metadataManager, long transactionLogIndex) throws 
IOException {
+    LOG.info("Starting volume quota support update");
+    Map<String, OmVolumeArgs> volUpdateMap = new HashMap<>();
+    try (TableIterator<String, ? extends Table.KeyValue<String, OmVolumeArgs>>
+             iterator = metadataManager.getVolumeTable().iterator()) {
+      while (iterator.hasNext()) {
+        Table.KeyValue<String, OmVolumeArgs> entry = iterator.next();
+        OmVolumeArgs omVolumeArgs = entry.getValue();
+        if (!(omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT
+            || omVolumeArgs.getQuotaInNamespace() == OLD_QUOTA_DEFAULT)) {
+          continue;
+        }
+        mergeOmLockDetails(metadataManager.getLock().acquireWriteLock(
+            VOLUME_LOCK, omVolumeArgs.getVolume()));
+        boolean acquiredVolumeLock = getOmLockDetails().isLockAcquired();
+        try {
+          boolean isQuotaReset = false;
+          if (omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT) {
+            omVolumeArgs.setQuotaInBytes(QUOTA_RESET);
+            isQuotaReset = true;
+          }
+          if (omVolumeArgs.getQuotaInNamespace() == OLD_QUOTA_DEFAULT) {
+            omVolumeArgs.setQuotaInNamespace(QUOTA_RESET);
+            isQuotaReset = true;
+          }
+          if (isQuotaReset) {
+            metadataManager.getVolumeTable().addCacheEntry(
+                new CacheKey<>(entry.getKey()), 
CacheValue.get(transactionLogIndex, omVolumeArgs));
+            volUpdateMap.put(entry.getKey(), omVolumeArgs);
+          }
+        } finally {
+          if (acquiredVolumeLock) {
+            
mergeOmLockDetails(metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, 
omVolumeArgs.getVolume()));
+          }
+        }
+      }
+    }
+    LOG.info("Completed volume quota support update for volume count {}", 
volUpdateMap.size());
+    return volUpdateMap;
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMQuotaRepairResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMQuotaRepairResponse.java
new file mode 100644
index 0000000000..8fa028d743
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMQuotaRepairResponse.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.volume;
+
+import jakarta.annotation.Nonnull;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.request.volume.OMQuotaRepairRequest;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE;
+
+/**
+ * Response for {@link OMQuotaRepairRequest} request.
+ */
+@CleanupTableInfo(cleanupTables = {VOLUME_TABLE, BUCKET_TABLE})
+public class OMQuotaRepairResponse extends OMClientResponse {
+  private Map<String, OmVolumeArgs> volumeArgsMap;
+  private Map<Pair<String, String>, OmBucketInfo> volBucketInfoMap;
+
+  /**
+   * for quota failure response update.
+   */
+  public OMQuotaRepairResponse(@Nonnull OMResponse omResponse) {
+    super(omResponse);
+  }
+
+  public OMQuotaRepairResponse(
+      @Nonnull OMResponse omResponse, Map<String, OmVolumeArgs> volumeArgsMap,
+      Map<Pair<String, String>, OmBucketInfo> volBucketInfoMap) {
+    super(omResponse);
+    this.volBucketInfoMap = volBucketInfoMap;
+    this.volumeArgsMap = volumeArgsMap;
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager metadataManager,
+      BatchOperation batchOp) throws IOException {
+    for (OmBucketInfo omBucketInfo : volBucketInfoMap.values()) {
+      metadataManager.getBucketTable().putWithBatch(batchOp,
+          metadataManager.getBucketKey(omBucketInfo.getVolumeName(),
+              omBucketInfo.getBucketName()), omBucketInfo);
+    }
+    for (OmVolumeArgs volArgs : volumeArgsMap.values()) {
+      metadataManager.getVolumeTable().putWithBatch(batchOp, 
volArgs.getVolume(), volArgs);
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
index 8a8ebd06f4..b3e64c98c5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
@@ -20,14 +20,18 @@
 package org.apache.hadoop.ozone.om.service;
 
 import com.google.common.util.concurrent.UncheckedExecutionException;
+import com.google.protobuf.ServiceException;
+import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -36,25 +40,29 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
-import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
-import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OLD_QUOTA_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.QUOTA_RESET;
-import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
-import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 
 /**
  * Quota repair task.
@@ -64,117 +72,189 @@ public class QuotaRepairTask {
       QuotaRepairTask.class);
   private static final int BATCH_SIZE = 5000;
   private static final int TASK_THREAD_CNT = 3;
-  public static final long EPOCH_DEFAULT = -1L;
-  private final OMMetadataManager metadataManager;
-  private final Map<String, OmBucketInfo> nameBucketInfoMap = new HashMap<>();
-  private final Map<String, OmBucketInfo> idBucketInfoMap = new HashMap<>();
+  private static final AtomicBoolean IN_PROGRESS = new AtomicBoolean(false);
+  private static final RepairStatus REPAIR_STATUS = new RepairStatus();
+  private final OzoneManager om;
+  private final AtomicLong runCount = new AtomicLong(0);
   private ExecutorService executor;
-  private final Map<String, CountPair> keyCountMap = new ConcurrentHashMap<>();
-  private final Map<String, CountPair> fileCountMap
-      = new ConcurrentHashMap<>();
-  private final Map<String, CountPair> directoryCountMap
-      = new ConcurrentHashMap<>();
-  private final Map<String, String> oldVolumeKeyNameMap = new HashMap();
+  public QuotaRepairTask(OzoneManager ozoneManager) {
+    this.om = ozoneManager;
+  }
 
-  public QuotaRepairTask(OMMetadataManager metadataManager) {
-    this.metadataManager = metadataManager;
+  public CompletableFuture<Boolean> repair() throws Exception {
+    // lock in progress operation and reject any other
+    if (!IN_PROGRESS.compareAndSet(false, true)) {
+      LOG.info("quota repair task already running");
+      return CompletableFuture.supplyAsync(() -> false);
+    }
+    REPAIR_STATUS.reset(runCount.get() + 1);
+    return CompletableFuture.supplyAsync(() -> repairTask());
   }
-  
-  public void repair() throws Exception {
-    LOG.info("Starting quota repair task");
-    prepareAllVolumeBucketInfo();
 
-    IOzoneManagerLock lock = metadataManager.getLock();
-    // thread pool with 3 Table type * (1 task each + 3 thread each)
-    executor = Executors.newFixedThreadPool(12);
+  public static String getStatus() {
+    return REPAIR_STATUS.toString();
+  }
+  private boolean repairTask() {
+    LOG.info("Starting quota repair task {}", REPAIR_STATUS);
+    OMMetadataManager activeMetaManager = null;
     try {
-      nameBucketInfoMap.values().stream().forEach(e -> lock.acquireReadLock(
-          BUCKET_LOCK, e.getVolumeName(), e.getBucketName()));
-      repairCount();
+      // thread pool with 3 Table type * (1 task each + 3 thread for each task)
+      executor = Executors.newFixedThreadPool(3 * (1 + TASK_THREAD_CNT));
+      OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder
+          = OzoneManagerProtocolProtos.QuotaRepairRequest.newBuilder();
+      // repair active db
+      activeMetaManager = createActiveDBCheckpoint(om.getMetadataManager(), 
om.getConfiguration());
+      repairActiveDb(activeMetaManager, builder);
+
+      // TODO: repair snapshots for quota
+
+      // submit request to update
+      ClientId clientId = ClientId.randomId();
+      OzoneManagerProtocolProtos.OMRequest omRequest = 
OzoneManagerProtocolProtos.OMRequest.newBuilder()
+          .setCmdType(OzoneManagerProtocolProtos.Type.QuotaRepair)
+          .setQuotaRepairRequest(builder.build())
+          .setClientId(clientId.toString())
+          .build();
+      OzoneManagerProtocolProtos.OMResponse response = 
submitRequest(omRequest, clientId);
+      if (response != null && !response.getSuccess()) {
+        LOG.error("update quota repair count response failed");
+        REPAIR_STATUS.updateStatus("Response for update DB is failed");
+        return false;
+      } else {
+        REPAIR_STATUS.updateStatus(builder, om.getMetadataManager());
+      }
+    } catch (Exception exp) {
+      LOG.error("quota repair count failed", exp);
+      REPAIR_STATUS.updateStatus(exp.toString());
+      return false;
     } finally {
-      nameBucketInfoMap.values().stream().forEach(e -> lock.releaseReadLock(
-          BUCKET_LOCK, e.getVolumeName(), e.getBucketName()));
+      LOG.info("Completed quota repair task {}", REPAIR_STATUS);
       executor.shutdown();
-      LOG.info("Completed quota repair task");
+      try {
+        if (null != activeMetaManager) {
+          activeMetaManager.stop();
+        }
+        cleanTempCheckPointPath(om.getMetadataManager());
+      } catch (Exception exp) {
+        LOG.error("failed to cleanup", exp);
+      }
+      IN_PROGRESS.set(false);
     }
-    updateOldVolumeQuotaSupport();
-
-    // cleanup epoch added to avoid extra epoch id in cache
-    ArrayList<Long> epochs = new ArrayList<>();
-    epochs.add(EPOCH_DEFAULT);
-    metadataManager.getBucketTable().cleanupCache(epochs);
-    metadataManager.getVolumeTable().cleanupCache(epochs);
+    return true;
   }
-  
-  private void prepareAllVolumeBucketInfo() throws IOException {
-    try (TableIterator<String, ? extends Table.KeyValue<String, OmVolumeArgs>>
-        iterator = metadataManager.getVolumeTable().iterator()) {
 
-      OmVolumeArgs omVolumeArgs;
-      while (iterator.hasNext()) {
-        Table.KeyValue<String, OmVolumeArgs> entry =
-            iterator.next();
-        omVolumeArgs = entry.getValue();
-        getAllBuckets(omVolumeArgs.getVolume(), omVolumeArgs.getObjectID());
-        if (omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT
-            || omVolumeArgs.getQuotaInNamespace() == OLD_QUOTA_DEFAULT) {
-          oldVolumeKeyNameMap.put(entry.getKey(), 
entry.getValue().getVolume());
-        }
+  private void repairActiveDb(
+      OMMetadataManager metadataManager,
+      OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder) throws 
Exception {
+    Map<String, OmBucketInfo> nameBucketInfoMap = new HashMap<>();
+    Map<String, OmBucketInfo> idBucketInfoMap = new HashMap<>();
+    Map<String, OmBucketInfo> oriBucketInfoMap = new HashMap<>();
+    prepareAllBucketInfo(nameBucketInfoMap, idBucketInfoMap, oriBucketInfoMap, 
metadataManager);
+
+    repairCount(nameBucketInfoMap, idBucketInfoMap, metadataManager);
+
+    // prepare and submit request to ratis
+    for (Map.Entry<String, OmBucketInfo> entry : nameBucketInfoMap.entrySet()) 
{
+      OmBucketInfo oriBucketInfo = oriBucketInfoMap.get(entry.getKey());
+      OmBucketInfo updatedBuckedInfo = entry.getValue();
+      boolean oldQuota = oriBucketInfo.getQuotaInBytes() == OLD_QUOTA_DEFAULT
+          || oriBucketInfo.getQuotaInNamespace() == OLD_QUOTA_DEFAULT;
+      if (!(oldQuota || isChange(oriBucketInfo, updatedBuckedInfo))) {
+        continue;
       }
+      OzoneManagerProtocolProtos.BucketQuotaCount.Builder bucketCountBuilder
+          = OzoneManagerProtocolProtos.BucketQuotaCount.newBuilder();
+      bucketCountBuilder.setVolName(updatedBuckedInfo.getVolumeName());
+      bucketCountBuilder.setBucketName(updatedBuckedInfo.getBucketName());
+      bucketCountBuilder.setDiffUsedBytes(updatedBuckedInfo.getUsedBytes() - 
oriBucketInfo.getUsedBytes());
+      bucketCountBuilder.setDiffUsedNamespace(
+          updatedBuckedInfo.getUsedNamespace() - 
oriBucketInfo.getUsedNamespace());
+      bucketCountBuilder.setSupportOldQuota(oldQuota);
+      builder.addBucketCount(bucketCountBuilder.build());
     }
+
+    // update volume to support quota
+    builder.setSupportVolumeOldQuota(true);
   }
 
-  private void updateOldVolumeQuotaSupport() throws IOException {
-    LOG.info("Starting volume quota support update");
-    IOzoneManagerLock lock = metadataManager.getLock();
-    try (BatchOperation batchOperation = metadataManager.getStore()
-        .initBatchOperation()) {
-      for (Map.Entry<String, String> volEntry
-          : oldVolumeKeyNameMap.entrySet()) {
-        lock.acquireReadLock(VOLUME_LOCK, volEntry.getValue());
-        try {
-          OmVolumeArgs omVolumeArgs = metadataManager.getVolumeTable().get(
-              volEntry.getKey());
-          boolean isQuotaReset = false;
-          if (omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT) {
-            omVolumeArgs.setQuotaInBytes(QUOTA_RESET);
-            isQuotaReset = true;
-          }
-          if (omVolumeArgs.getQuotaInNamespace() == OLD_QUOTA_DEFAULT) {
-            omVolumeArgs.setQuotaInNamespace(QUOTA_RESET);
-            isQuotaReset = true;
-          }
-          if (isQuotaReset) {
-            metadataManager.getVolumeTable().addCacheEntry(
-                new CacheKey<>(volEntry.getKey()),
-                CacheValue.get(EPOCH_DEFAULT, omVolumeArgs));
-            metadataManager.getVolumeTable().putWithBatch(batchOperation,
-                volEntry.getKey(), omVolumeArgs);
-          }
-        } finally {
-          lock.releaseReadLock(VOLUME_LOCK, volEntry.getValue());
-        }
+  private OzoneManagerProtocolProtos.OMResponse submitRequest(
+      OzoneManagerProtocolProtos.OMRequest omRequest, ClientId clientId) {
+    try {
+      if (om.isRatisEnabled()) {
+        OzoneManagerRatisServer server = om.getOmRatisServer();
+        RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
+            .setClientId(clientId)
+            .setServerId(om.getOmRatisServer().getRaftPeerId())
+            .setGroupId(om.getOmRatisServer().getRaftGroupId())
+            .setCallId(runCount.getAndIncrement())
+            
.setMessage(Message.valueOf(OMRatisHelper.convertRequestToByteString(omRequest)))
+            .setType(RaftClientRequest.writeRequestType())
+            .build();
+        return server.submitRequest(omRequest, raftClientRequest);
+      } else {
+        return om.getOmServerProtocol().submitRequest(
+            null, omRequest);
       }
-      metadataManager.getStore().commitBatchOperation(batchOperation);
+    } catch (ServiceException e) {
+      LOG.error("repair quota count " + omRequest.getCmdType() + " request 
failed.", e);
     }
-    LOG.info("Completed volume quota support update");
+    return null;
   }
   
-  private void getAllBuckets(String volumeName, long volumeId)
-      throws IOException {
-    List<OmBucketInfo> bucketList = metadataManager.listBuckets(
-        volumeName, null, null, Integer.MAX_VALUE, false);
-    for (OmBucketInfo bucketInfo : bucketList) {
-      bucketInfo.incrUsedNamespace(-bucketInfo.getUsedNamespace());
-      bucketInfo.incrUsedBytes(-bucketInfo.getUsedBytes());
-      nameBucketInfoMap.put(buildNamePath(volumeName,
-          bucketInfo.getBucketName()), bucketInfo);
-      idBucketInfoMap.put(buildIdPath(volumeId, bucketInfo.getObjectID()),
-          bucketInfo);
+  private OMMetadataManager createActiveDBCheckpoint(
+      OMMetadataManager omMetaManager, OzoneConfiguration conf) throws 
IOException {
+    // cleanup
+    String parentPath = cleanTempCheckPointPath(omMetaManager);
+
+    // create snapshot
+    DBCheckpoint checkpoint = 
omMetaManager.getStore().getCheckpoint(parentPath, true);
+    return OmMetadataManagerImpl.createCheckpointMetadataManager(conf, 
checkpoint);
+  }
+
+  private static String cleanTempCheckPointPath(OMMetadataManager 
omMetaManager) throws IOException {
+    File dbLocation = omMetaManager.getStore().getDbLocation();
+    if (dbLocation == null) {
+      throw new NullPointerException("db location is null");
+    }
+    String tempData = dbLocation.getParent();
+    if (tempData == null) {
+      throw new NullPointerException("parent db dir is null");
+    }
+    File repairTmpPath = Paths.get(tempData, "temp-repair-quota").toFile();
+    FileUtils.deleteDirectory(repairTmpPath);
+    FileUtils.forceMkdir(repairTmpPath);
+    return repairTmpPath.toString();
+  }
+
+  private void prepareAllBucketInfo(
+      Map<String, OmBucketInfo> nameBucketInfoMap, Map<String, OmBucketInfo> 
idBucketInfoMap,
+      Map<String, OmBucketInfo> oriBucketInfoMap, OMMetadataManager 
metadataManager) throws IOException {
+    try (TableIterator<String, ? extends Table.KeyValue<String, OmBucketInfo>>
+             iterator = metadataManager.getBucketTable().iterator()) {
+      while (iterator.hasNext()) {
+        Table.KeyValue<String, OmBucketInfo> entry = iterator.next();
+        OmBucketInfo bucketInfo = entry.getValue();
+        String bucketNameKey = buildNamePath(bucketInfo.getVolumeName(),
+            bucketInfo.getBucketName());
+        oriBucketInfoMap.put(bucketNameKey, bucketInfo.copyObject());
+        bucketInfo.incrUsedNamespace(-bucketInfo.getUsedNamespace());
+        bucketInfo.incrUsedBytes(-bucketInfo.getUsedBytes());
+        nameBucketInfoMap.put(bucketNameKey, bucketInfo);
+        
idBucketInfoMap.put(buildIdPath(metadataManager.getVolumeId(bucketInfo.getVolumeName()),
+                bucketInfo.getObjectID()), bucketInfo);
+      }
+    }
+  }
+
+  private boolean isChange(OmBucketInfo lBucketInfo, OmBucketInfo rBucketInfo) 
{
+    if (lBucketInfo.getUsedNamespace() != rBucketInfo.getUsedNamespace()
+        || lBucketInfo.getUsedBytes() != rBucketInfo.getUsedBytes()) {
+      return true;
     }
+    return false;
   }
   
-  private String buildNamePath(String volumeName, String bucketName) {
+  private static String buildNamePath(String volumeName, String bucketName) {
     final StringBuilder builder = new StringBuilder();
     builder.append(OM_KEY_PREFIX)
         .append(volumeName)
@@ -184,7 +264,7 @@ public class QuotaRepairTask {
     return builder.toString();
   }
 
-  private String buildIdPath(long volumeId, long bucketId) {
+  private static String buildIdPath(long volumeId, long bucketId) {
     final StringBuilder builder = new StringBuilder();
     builder.append(OM_KEY_PREFIX)
         .append(volumeId)
@@ -194,8 +274,13 @@ public class QuotaRepairTask {
     return builder.toString();
   }
   
-  private void repairCount() throws Exception {
-    LOG.info("Starting quota repair for all keys, files and directories");
+  private void repairCount(
+      Map<String, OmBucketInfo> nameBucketInfoMap, Map<String, OmBucketInfo> 
idBucketInfoMap,
+      OMMetadataManager metadataManager) throws Exception {
+    LOG.info("Starting quota repair counting for all keys, files and 
directories");
+    Map<String, CountPair> keyCountMap = new ConcurrentHashMap<>();
+    Map<String, CountPair> fileCountMap = new ConcurrentHashMap<>();
+    Map<String, CountPair> directoryCountMap = new ConcurrentHashMap<>();
     try {
       nameBucketInfoMap.keySet().stream().forEach(e -> keyCountMap.put(e,
           new CountPair()));
@@ -225,51 +310,11 @@ public class QuotaRepairTask {
       throw new Exception(ex.getCause());
     }
     
-    // persist bucket info
+    // update count to bucket info
     updateCountToBucketInfo(nameBucketInfoMap, keyCountMap);
     updateCountToBucketInfo(idBucketInfoMap, fileCountMap);
     updateCountToBucketInfo(idBucketInfoMap, directoryCountMap);
-    
-    // update quota enable flag for old volume and buckets
-    updateOldBucketQuotaSupport();
-
-    try (BatchOperation batchOperation = metadataManager.getStore()
-        .initBatchOperation()) {
-      for (Map.Entry<String, OmBucketInfo> entry
-          : nameBucketInfoMap.entrySet()) {
-        String bucketKey = metadataManager.getBucketKey(
-            entry.getValue().getVolumeName(),
-            entry.getValue().getBucketName());
-        metadataManager.getBucketTable().putWithBatch(batchOperation,
-            bucketKey, entry.getValue());
-      }
-      metadataManager.getStore().commitBatchOperation(batchOperation);
-    }
-    LOG.info("Completed quota repair for all keys, files and directories");
-  }
-
-  private void updateOldBucketQuotaSupport() {
-    for (Map.Entry<String, OmBucketInfo> entry : nameBucketInfoMap.entrySet()) 
{
-      if (entry.getValue().getQuotaInBytes() == OLD_QUOTA_DEFAULT
-          || entry.getValue().getQuotaInNamespace() == OLD_QUOTA_DEFAULT) {
-        OmBucketInfo.Builder builder = entry.getValue().toBuilder();
-        if (entry.getValue().getQuotaInBytes() == OLD_QUOTA_DEFAULT) {
-          builder.setQuotaInBytes(QUOTA_RESET);
-        }
-        if (entry.getValue().getQuotaInNamespace() == OLD_QUOTA_DEFAULT) {
-          builder.setQuotaInNamespace(QUOTA_RESET);
-        }
-        OmBucketInfo bucketInfo = builder.build();
-        entry.setValue(bucketInfo);
-        
-        // there is a new value to be updated in bucket cache
-        String bucketKey = metadataManager.getBucketKey(
-            bucketInfo.getVolumeName(), bucketInfo.getBucketName());
-        metadataManager.getBucketTable().addCacheEntry(
-            new CacheKey<>(bucketKey),
-            CacheValue.get(EPOCH_DEFAULT, bucketInfo));
-      }
-    }
+    LOG.info("Completed quota repair counting for all keys, files and 
directories");
   }
 
   private <VALUE> void recalculateUsages(
@@ -315,7 +360,7 @@ public class QuotaRepairTask {
     }
   }
   
-  private <VALUE> void captureCount(
+  private static <VALUE> void captureCount(
       Map<String, CountPair> prefixUsageMap,
       BlockingQueue<List<Table.KeyValue<String, VALUE>>> q,
       AtomicBoolean isRunning, boolean haveValue) throws UncheckedIOException {
@@ -334,7 +379,7 @@ public class QuotaRepairTask {
     }
   }
   
-  private <VALUE> void extractCount(
+  private static <VALUE> void extractCount(
       Table.KeyValue<String, VALUE> kv,
       Map<String, CountPair> prefixUsageMap,
       boolean haveValue) {
@@ -357,7 +402,7 @@ public class QuotaRepairTask {
     }
   }
   
-  private synchronized void updateCountToBucketInfo(
+  private static synchronized void updateCountToBucketInfo(
       Map<String, OmBucketInfo> bucketInfoMap,
       Map<String, CountPair> prefixUsageMap) {
     for (Map.Entry<String, CountPair> entry : prefixUsageMap.entrySet()) {
@@ -370,7 +415,7 @@ public class QuotaRepairTask {
     }
   }
   
-  private String getVolumeBucketPrefix(String key) {
+  private static String getVolumeBucketPrefix(String key) {
     // get bucket prefix with /<volume>/<bucket>/ 
     // -- as represents name in OBS and id in FSO
     String prefix = key;
@@ -404,4 +449,66 @@ public class QuotaRepairTask {
       return namespace.get();
     }
   }
+
+  /**
+   * Repair status for last run.
+   */
+  public static class RepairStatus {
+    private boolean isTriggered = false;
+    private long taskId = 0;
+    private long lastRunStartTime = 0;
+    private long lastRunFinishedTime = 0;
+    private String errorMsg = null;
+    private Map<String, Map<String, Long>> bucketCountDiffMap = new 
ConcurrentHashMap<>();
+
+    @Override
+    public String toString() {
+      if (!isTriggered) {
+        return "{}";
+      }
+      Map<String, Object> status = new HashMap<>();
+      status.put("taskId", taskId);
+      status.put("lastRunStartTime", lastRunStartTime);
+      status.put("lastRunFinishedTime", lastRunFinishedTime);
+      status.put("errorMsg", errorMsg);
+      status.put("bucketCountDiffMap", bucketCountDiffMap);
+      try {
+        return new ObjectMapper().writeValueAsString(status);
+      } catch (IOException e) {
+        LOG.error("error in generating status", e);
+        return "{}";
+      }
+    }
+
+    public void 
updateStatus(OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder,
+                             OMMetadataManager metadataManager) {
+      isTriggered = true;
+      lastRunFinishedTime = System.currentTimeMillis();
+      errorMsg = "";
+      bucketCountDiffMap.clear();
+      for (OzoneManagerProtocolProtos.BucketQuotaCount quotaCount : 
builder.getBucketCountList()) {
+        String bucketKey = 
metadataManager.getBucketKey(quotaCount.getVolName(), 
quotaCount.getBucketName());
+        ConcurrentHashMap<String, Long> diffCountMap = new 
ConcurrentHashMap<>();
+        diffCountMap.put("DiffUsedBytes", quotaCount.getDiffUsedBytes());
+        diffCountMap.put("DiffUsedNamespace", 
quotaCount.getDiffUsedNamespace());
+        bucketCountDiffMap.put(bucketKey, diffCountMap);
+      }
+    }
+
+    public void updateStatus(String errMsg) {
+      isTriggered = true;
+      lastRunFinishedTime = System.currentTimeMillis();
+      errorMsg = errMsg;
+      bucketCountDiffMap.clear();
+    }
+
+    public void reset(long tskId) {
+      isTriggered = true;
+      taskId = tskId;
+      lastRunStartTime = System.currentTimeMillis();
+      lastRunFinishedTime = 0;
+      errorMsg = "";
+      bucketCountDiffMap.clear();
+    }
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java
index 8a2e4f550e..446c7382d5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java
@@ -21,26 +21,39 @@ package org.apache.hadoop.ozone.om.upgrade;
 
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
+import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
 import org.apache.hadoop.ozone.om.service.QuotaRepairTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.QUOTA;
-import static 
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FIRST_UPGRADE_START;
+import static 
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE;
 
 /**
- * Quota repair for usages action to be triggered during first upgrade.
+ * Quota repair for usages action to be triggered after upgrade.
  */
-@UpgradeActionOm(type = ON_FIRST_UPGRADE_START, feature =
-    QUOTA)
+@UpgradeActionOm(type = ON_FINALIZE, feature = QUOTA)
 public class QuotaRepairUpgradeAction implements OmUpgradeAction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(QuotaRepairUpgradeAction.class);
   @Override
   public void execute(OzoneManager arg) throws Exception {
     boolean enabled = arg.getConfiguration().getBoolean(
         OMConfigKeys.OZONE_OM_UPGRADE_QUOTA_RECALCULATE_ENABLE,
         OMConfigKeys.OZONE_OM_UPGRADE_QUOTA_RECALCULATE_ENABLE_DEFAULT);
     if (enabled) {
-      QuotaRepairTask quotaRepairTask = new QuotaRepairTask(
-          arg.getMetadataManager());
-      quotaRepairTask.repair();
+      // just trigger quota repair and status can be checked via CLI
+      try {
+        if (arg.isRatisEnabled()) {
+          arg.checkLeaderStatus();
+        }
+        QuotaRepairTask quotaRepairTask = new QuotaRepairTask(arg);
+        quotaRepairTask.repair();
+      } catch (OMNotLeaderException | OMLeaderNotReadyException ex) {
+        // on leader node, repair will be triggered where finalize is called. 
For other nodes, it will be ignored.
+        // This can be triggered on need basis via CLI tool.
+        LOG.warn("Skip quota repair operation during upgrade on the node as 
this is not a leader node.");
+      }
     }
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java
index 1a0db11833..06b8beacb3 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java
@@ -22,8 +22,16 @@ package org.apache.hadoop.ozone.om.service;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -32,6 +40,11 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.om.request.key.TestOMKeyRequest;
+import org.apache.hadoop.ozone.om.request.volume.OMQuotaRepairRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.volume.OMQuotaRepairResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
 import org.apache.hadoop.util.Time;
 import org.junit.jupiter.api.Test;
 
@@ -44,6 +57,16 @@ public class TestQuotaRepairTask extends TestOMKeyRequest {
 
   @Test
   public void testQuotaRepair() throws Exception {
+    when(ozoneManager.isRatisEnabled()).thenReturn(false);
+    OzoneManagerProtocolProtos.OMResponse respMock = 
mock(OzoneManagerProtocolProtos.OMResponse.class);
+    when(respMock.getSuccess()).thenReturn(true);
+    OzoneManagerProtocolServerSideTranslatorPB serverMock = 
mock(OzoneManagerProtocolServerSideTranslatorPB.class);
+    AtomicReference<OzoneManagerProtocolProtos.OMRequest> ref = new 
AtomicReference<>();
+    doAnswer(invocation -> {
+      ref.set(invocation.getArgument(1, 
OzoneManagerProtocolProtos.OMRequest.class));
+      return respMock;
+    }).when(serverMock).submitRequest(any(), any());
+    when(ozoneManager.getOmServerProtocol()).thenReturn(serverMock);
     OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
         omMetadataManager, BucketLayout.OBJECT_STORE);
 
@@ -88,9 +111,16 @@ public class TestQuotaRepairTask extends TestOMKeyRequest {
     assertEquals(0, fsoBucketInfo.getUsedNamespace());
     assertEquals(0, fsoBucketInfo.getUsedBytes());
     
-    QuotaRepairTask quotaRepairTask = new QuotaRepairTask(omMetadataManager);
-    quotaRepairTask.repair();
-
+    QuotaRepairTask quotaRepairTask = new QuotaRepairTask(ozoneManager);
+    CompletableFuture<Boolean> repair = quotaRepairTask.repair();
+    Boolean repairStatus = repair.get();
+    assertTrue(repairStatus);
+
+    OMQuotaRepairRequest omQuotaRepairRequest = new 
OMQuotaRepairRequest(ref.get());
+    OMClientResponse omClientResponse = 
omQuotaRepairRequest.validateAndUpdateCache(ozoneManager, 1);
+    BatchOperation batchOperation = 
omMetadataManager.getStore().initBatchOperation();
+    ((OMQuotaRepairResponse)omClientResponse).addToDBBatch(omMetadataManager, 
batchOperation);
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
     // 10 files of each type, obs have replication of three and
     // fso have replication of one
     OmBucketInfo obsUpdateBucketInfo = omMetadataManager.getBucketTable().get(
@@ -105,6 +135,16 @@ public class TestQuotaRepairTask extends TestOMKeyRequest {
 
   @Test
   public void testQuotaRepairForOldVersionVolumeBucket() throws Exception {
+    when(ozoneManager.isRatisEnabled()).thenReturn(false);
+    OzoneManagerProtocolProtos.OMResponse respMock = 
mock(OzoneManagerProtocolProtos.OMResponse.class);
+    when(respMock.getSuccess()).thenReturn(true);
+    OzoneManagerProtocolServerSideTranslatorPB serverMock = 
mock(OzoneManagerProtocolServerSideTranslatorPB.class);
+    AtomicReference<OzoneManagerProtocolProtos.OMRequest> ref = new 
AtomicReference<>();
+    doAnswer(invocation -> {
+      ref.set(invocation.getArgument(1, 
OzoneManagerProtocolProtos.OMRequest.class));
+      return respMock;
+    }).when(serverMock).submitRequest(any(), any());
+    when(ozoneManager.getOmServerProtocol()).thenReturn(serverMock);
     // add volume with -2 value
     OmVolumeArgs omVolumeArgs =
         OmVolumeArgs.newBuilder().setCreationTime(Time.now())
@@ -117,13 +157,14 @@ public class TestQuotaRepairTask extends TestOMKeyRequest 
{
         new CacheKey<>(omMetadataManager.getVolumeKey(volumeName)),
         CacheValue.get(1L, omVolumeArgs));
     
-    // add bucket with -2 value
+    // add bucket with -2 value and add to db
     OMRequestTestUtils.addBucketToDB(volumeName, bucketName,
         omMetadataManager, -2);
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    omMetadataManager.getBucketTable().put(bucketKey, 
omMetadataManager.getBucketTable().get(bucketKey));
 
     // pre check for quota flag
-    OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
-        omMetadataManager.getBucketKey(volumeName, bucketName));
+    OmBucketInfo bucketInfo = 
omMetadataManager.getBucketTable().get(bucketKey);
     assertEquals(-2, bucketInfo.getQuotaInBytes());
     
     omVolumeArgs = omMetadataManager.getVolumeTable().get(
@@ -131,11 +172,18 @@ public class TestQuotaRepairTask extends TestOMKeyRequest 
{
     assertEquals(-2, omVolumeArgs.getQuotaInBytes());
     assertEquals(-2, omVolumeArgs.getQuotaInNamespace());
 
-    QuotaRepairTask quotaRepairTask = new QuotaRepairTask(omMetadataManager);
-    quotaRepairTask.repair();
+    QuotaRepairTask quotaRepairTask = new QuotaRepairTask(ozoneManager);
+    CompletableFuture<Boolean> repair = quotaRepairTask.repair();
+    Boolean repairStatus = repair.get();
+    assertTrue(repairStatus);
 
+    OMQuotaRepairRequest omQuotaRepairRequest = new 
OMQuotaRepairRequest(ref.get());
+    OMClientResponse omClientResponse = 
omQuotaRepairRequest.validateAndUpdateCache(ozoneManager, 1);
+    BatchOperation batchOperation = 
omMetadataManager.getStore().initBatchOperation();
+    ((OMQuotaRepairResponse)omClientResponse).addToDBBatch(omMetadataManager, 
batchOperation);
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
     bucketInfo = omMetadataManager.getBucketTable().get(
-        omMetadataManager.getBucketKey(volumeName, bucketName));
+        bucketKey);
     assertEquals(-1, bucketInfo.getQuotaInBytes());
     OmVolumeArgs volArgsVerify = omMetadataManager.getVolumeTable()
         .get(omMetadataManager.getVolumeKey(volumeName));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to