This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 86ccc48495 HDDS-11284. refactor quota repair non-blocking while
upgrade (#7035)
86ccc48495 is described below
commit 86ccc48495cf887e0e9ef302d1ba70bf06b22d98
Author: Sumit Agrawal <[email protected]>
AuthorDate: Wed Aug 21 12:50:19 2024 +0530
HDDS-11284. refactor quota repair non-blocking while upgrade (#7035)
---
.../org/apache/hadoop/hdds/utils/db/DBStore.java | 8 +
.../org/apache/hadoop/hdds/utils/db/RDBStore.java | 8 +
.../main/java/org/apache/hadoop/ozone/OmUtils.java | 1 +
.../src/main/proto/OmClientProtocol.proto | 21 +-
.../om/ratis/utils/OzoneManagerRatisUtils.java | 3 +
.../om/request/volume/OMQuotaRepairRequest.java | 191 ++++++++++
.../om/response/volume/OMQuotaRepairResponse.java | 72 ++++
.../hadoop/ozone/om/service/QuotaRepairTask.java | 401 +++++++++++++--------
.../ozone/om/upgrade/QuotaRepairUpgradeAction.java | 27 +-
.../ozone/om/service/TestQuotaRepairTask.java | 68 +++-
10 files changed, 633 insertions(+), 167 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
index 1985562523..3e8ea30a65 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
@@ -167,6 +167,14 @@ public interface DBStore extends Closeable,
BatchOperationHandler {
*/
DBCheckpoint getCheckpoint(boolean flush) throws IOException;
+ /**
+ * Get current snapshot of DB store as an artifact stored on
+ * the local filesystem with different parent path.
+ * @return An object that encapsulates the checkpoint information along with
+ * location.
+ */
+ DBCheckpoint getCheckpoint(String parentDir, boolean flush) throws
IOException;
+
/**
* Get DB Store location.
* @return DB file location.
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index d5aa961b0e..99924f724d 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -338,6 +338,14 @@ public class RDBStore implements DBStore {
return checkPointManager.createCheckpoint(checkpointsParentDir);
}
+ @Override
+ public DBCheckpoint getCheckpoint(String parentPath, boolean flush) throws
IOException {
+ if (flush) {
+ this.flushDB();
+ }
+ return checkPointManager.createCheckpoint(parentPath, null);
+ }
+
public DBCheckpoint getSnapshot(String name) throws IOException {
this.flushLog(true);
return checkPointManager.createCheckpoint(snapshotsParentDir, name);
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 82030669c9..bf27d7afb6 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -327,6 +327,7 @@ public final class OmUtils {
case SetTimes:
case AbortExpiredMultiPartUploads:
case SetSnapshotProperty:
+ case QuotaRepair:
case UnknownCommand:
return false;
case EchoRPC:
diff --git
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 32bba26608..4bdfa97b93 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -147,8 +147,8 @@ enum Type {
ListStatusLight = 129;
GetSnapshotInfo = 130;
RenameSnapshot = 131;
-
ListOpenFiles = 132;
+ QuotaRepair = 133;
}
enum SafeMode {
@@ -285,8 +285,8 @@ message OMRequest {
optional SetSnapshotPropertyRequest SetSnapshotPropertyRequest =
127;
optional SnapshotInfoRequest SnapshotInfoRequest =
128;
optional RenameSnapshotRequest RenameSnapshotRequest =
129;
-
optional ListOpenFilesRequest ListOpenFilesRequest =
130;
+ optional QuotaRepairRequest QuotaRepairRequest =
131;
}
message OMResponse {
@@ -410,8 +410,8 @@ message OMResponse {
optional SnapshotInfoResponse SnapshotInfoResponse =
130;
optional OMLockDetailsProto omLockDetails =
131;
optional RenameSnapshotResponse RenameSnapshotResponse =
132;
-
optional ListOpenFilesResponse ListOpenFilesResponse =
133;
+ optional QuotaRepairResponse QuotaRepairResponse = 134;
}
enum Status {
@@ -2187,6 +2187,21 @@ message SetSafeModeResponse {
optional bool response = 1;
}
+message QuotaRepairRequest {
+ repeated BucketQuotaCount bucketCount = 1;
+ required bool supportVolumeOldQuota = 2 [default=false];
+}
+message BucketQuotaCount {
+ required string volName = 1;
+ required string bucketName = 2;
+ required int64 diffUsedBytes = 3;
+ required int64 diffUsedNamespace = 4;
+ required bool supportOldQuota = 5 [default=false];
+}
+
+message QuotaRepairResponse {
+}
+
message OMLockDetailsProto {
optional bool isLockAcquired = 1;
optional uint64 waitLockNanos = 2;
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 8ff59e091d..5dc640c742 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -84,6 +84,7 @@ import
org.apache.hadoop.ozone.om.request.upgrade.OMCancelPrepareRequest;
import org.apache.hadoop.ozone.om.request.upgrade.OMFinalizeUpgradeRequest;
import org.apache.hadoop.ozone.om.request.upgrade.OMPrepareRequest;
import org.apache.hadoop.ozone.om.request.util.OMEchoRPCWriteRequest;
+import org.apache.hadoop.ozone.om.request.volume.OMQuotaRepairRequest;
import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest;
import org.apache.hadoop.ozone.om.request.volume.OMVolumeDeleteRequest;
import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetOwnerRequest;
@@ -331,6 +332,8 @@ public final class OzoneManagerRatisUtils {
return new OMEchoRPCWriteRequest(omRequest);
case AbortExpiredMultiPartUploads:
return new S3ExpiredMultipartUploadsAbortRequest(omRequest);
+ case QuotaRepair:
+ return new OMQuotaRepairRequest(omRequest);
default:
throw new OMException("Unrecognized write command type request "
+ cmdType, OMException.ResultCodes.INVALID_REQUEST);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java
new file mode 100644
index 0000000000..e307a1f95f
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.volume;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.volume.OMQuotaRepairResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OLD_QUOTA_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.QUOTA_RESET;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Handle OMQuotaRepairRequest Request.
+ */
+public class OMQuotaRepairRequest extends OMClientRequest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OMQuotaRepairRequest.class);
+
+ public OMQuotaRepairRequest(OMRequest omRequest) {
+ super(omRequest);
+ }
+
+ @Override
+ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+ UserGroupInformation ugi = createUGIForApi();
+ if (ozoneManager.getAclsEnabled() && !ozoneManager.isAdmin(ugi)) {
+ throw new OMException("Access denied for user " + ugi + ". Admin
privilege is required for quota repair.",
+ OMException.ResultCodes.ACCESS_DENIED);
+ }
+ return super.preExecute(ozoneManager);
+ }
+
+ @Override
+ @SuppressWarnings("methodlength")
+ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
TermIndex termIndex) {
+ final long transactionLogIndex = termIndex.getIndex();
+ OzoneManagerProtocolProtos.QuotaRepairRequest quotaRepairRequest =
+ getOmRequest().getQuotaRepairRequest();
+ Preconditions.checkNotNull(quotaRepairRequest);
+
+ OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+ OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
OmResponseUtil.getOMResponseBuilder(getOmRequest());
+ Map<Pair<String, String>, OmBucketInfo> bucketMap = new HashMap<>();
+ OMClientResponse omClientResponse = null;
+ try {
+ for (int i = 0; i < quotaRepairRequest.getBucketCountCount(); ++i) {
+ OzoneManagerProtocolProtos.BucketQuotaCount bucketCountInfo =
quotaRepairRequest.getBucketCount(i);
+ updateBucketInfo(omMetadataManager, bucketCountInfo,
transactionLogIndex, bucketMap);
+ }
+ Map<String, OmVolumeArgs> volUpdateMap;
+ if (quotaRepairRequest.getSupportVolumeOldQuota()) {
+ volUpdateMap = updateOldVolumeQuotaSupport(omMetadataManager,
transactionLogIndex);
+ } else {
+ volUpdateMap = Collections.emptyMap();
+ }
+ omResponse.setQuotaRepairResponse(
+ OzoneManagerProtocolProtos.QuotaRepairResponse.newBuilder().build());
+ omClientResponse = new OMQuotaRepairResponse(omResponse.build(),
volUpdateMap, bucketMap);
+ } catch (IOException ex) {
+ LOG.error("failed to update repair count", ex);
+ omClientResponse = new
OMQuotaRepairResponse(createErrorOMResponse(omResponse, ex));
+ } finally {
+ if (omClientResponse != null) {
+ omClientResponse.setOmLockDetails(getOmLockDetails());
+ }
+ }
+
+ return omClientResponse;
+ }
+
+ private void updateBucketInfo(
+ OMMetadataManager omMetadataManager,
OzoneManagerProtocolProtos.BucketQuotaCount bucketCountInfo,
+ long transactionLogIndex, Map<Pair<String, String>, OmBucketInfo>
bucketMap) throws IOException {
+ // acquire lock.
+ mergeOmLockDetails(omMetadataManager.getLock().acquireWriteLock(
+ BUCKET_LOCK, bucketCountInfo.getVolName(),
bucketCountInfo.getBucketName()));
+ boolean acquiredBucketLock = getOmLockDetails().isLockAcquired();
+ try {
+ String bucketKey =
omMetadataManager.getBucketKey(bucketCountInfo.getVolName(),
+ bucketCountInfo.getBucketName());
+ OmBucketInfo bucketInfo =
omMetadataManager.getBucketTable().get(bucketKey);
+ if (null == bucketInfo) {
+ // bucket might be deleted when running repair count parallel
+ return;
+ }
+ bucketInfo.incrUsedBytes(bucketCountInfo.getDiffUsedBytes());
+ bucketInfo.incrUsedNamespace(bucketCountInfo.getDiffUsedNamespace());
+ if (bucketCountInfo.getSupportOldQuota()) {
+ OmBucketInfo.Builder builder = bucketInfo.toBuilder();
+ if (bucketInfo.getQuotaInBytes() == OLD_QUOTA_DEFAULT) {
+ builder.setQuotaInBytes(QUOTA_RESET);
+ }
+ if (bucketInfo.getQuotaInNamespace() == OLD_QUOTA_DEFAULT) {
+ builder.setQuotaInNamespace(QUOTA_RESET);
+ }
+ bucketInfo = builder.build();
+ }
+
+ omMetadataManager.getBucketTable().addCacheEntry(
+ new CacheKey<>(bucketKey), CacheValue.get(transactionLogIndex,
bucketInfo));
+ bucketMap.put(Pair.of(bucketCountInfo.getVolName(),
bucketCountInfo.getBucketName()), bucketInfo);
+ } finally {
+ if (acquiredBucketLock) {
+ mergeOmLockDetails(omMetadataManager.getLock()
+ .releaseWriteLock(BUCKET_LOCK, bucketCountInfo.getVolName(),
bucketCountInfo.getBucketName()));
+ }
+ }
+ }
+
+ private Map<String, OmVolumeArgs> updateOldVolumeQuotaSupport(
+ OMMetadataManager metadataManager, long transactionLogIndex) throws
IOException {
+ LOG.info("Starting volume quota support update");
+ Map<String, OmVolumeArgs> volUpdateMap = new HashMap<>();
+ try (TableIterator<String, ? extends Table.KeyValue<String, OmVolumeArgs>>
+ iterator = metadataManager.getVolumeTable().iterator()) {
+ while (iterator.hasNext()) {
+ Table.KeyValue<String, OmVolumeArgs> entry = iterator.next();
+ OmVolumeArgs omVolumeArgs = entry.getValue();
+ if (!(omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT
+ || omVolumeArgs.getQuotaInNamespace() == OLD_QUOTA_DEFAULT)) {
+ continue;
+ }
+ mergeOmLockDetails(metadataManager.getLock().acquireWriteLock(
+ VOLUME_LOCK, omVolumeArgs.getVolume()));
+ boolean acquiredVolumeLock = getOmLockDetails().isLockAcquired();
+ try {
+ boolean isQuotaReset = false;
+ if (omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT) {
+ omVolumeArgs.setQuotaInBytes(QUOTA_RESET);
+ isQuotaReset = true;
+ }
+ if (omVolumeArgs.getQuotaInNamespace() == OLD_QUOTA_DEFAULT) {
+ omVolumeArgs.setQuotaInNamespace(QUOTA_RESET);
+ isQuotaReset = true;
+ }
+ if (isQuotaReset) {
+ metadataManager.getVolumeTable().addCacheEntry(
+ new CacheKey<>(entry.getKey()),
CacheValue.get(transactionLogIndex, omVolumeArgs));
+ volUpdateMap.put(entry.getKey(), omVolumeArgs);
+ }
+ } finally {
+ if (acquiredVolumeLock) {
+
mergeOmLockDetails(metadataManager.getLock().releaseWriteLock(VOLUME_LOCK,
omVolumeArgs.getVolume()));
+ }
+ }
+ }
+ }
+ LOG.info("Completed volume quota support update for volume count {}",
volUpdateMap.size());
+ return volUpdateMap;
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMQuotaRepairResponse.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMQuotaRepairResponse.java
new file mode 100644
index 0000000000..8fa028d743
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMQuotaRepairResponse.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.volume;
+
+import jakarta.annotation.Nonnull;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.request.volume.OMQuotaRepairRequest;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE;
+
+/**
+ * Response for {@link OMQuotaRepairRequest} request.
+ */
+@CleanupTableInfo(cleanupTables = {VOLUME_TABLE, BUCKET_TABLE})
+public class OMQuotaRepairResponse extends OMClientResponse {
+ private Map<String, OmVolumeArgs> volumeArgsMap;
+ private Map<Pair<String, String>, OmBucketInfo> volBucketInfoMap;
+
+ /**
+ * for quota failure response update.
+ */
+ public OMQuotaRepairResponse(@Nonnull OMResponse omResponse) {
+ super(omResponse);
+ }
+
+ public OMQuotaRepairResponse(
+ @Nonnull OMResponse omResponse, Map<String, OmVolumeArgs> volumeArgsMap,
+ Map<Pair<String, String>, OmBucketInfo> volBucketInfoMap) {
+ super(omResponse);
+ this.volBucketInfoMap = volBucketInfoMap;
+ this.volumeArgsMap = volumeArgsMap;
+ }
+
+ @Override
+ public void addToDBBatch(OMMetadataManager metadataManager,
+ BatchOperation batchOp) throws IOException {
+ for (OmBucketInfo omBucketInfo : volBucketInfoMap.values()) {
+ metadataManager.getBucketTable().putWithBatch(batchOp,
+ metadataManager.getBucketKey(omBucketInfo.getVolumeName(),
+ omBucketInfo.getBucketName()), omBucketInfo);
+ }
+ for (OmVolumeArgs volArgs : volumeArgsMap.values()) {
+ metadataManager.getVolumeTable().putWithBatch(batchOp,
volArgs.getVolume(), volArgs);
+ }
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
index 8a8ebd06f4..b3e64c98c5 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
@@ -20,14 +20,18 @@
package org.apache.hadoop.ozone.om.service;
import com.google.common.util.concurrent.UncheckedExecutionException;
+import com.google.protobuf.ServiceException;
+import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -36,25 +40,29 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
-import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
-import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ozone.OzoneConsts.OLD_QUOTA_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.QUOTA_RESET;
-import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
-import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
/**
* Quota repair task.
@@ -64,117 +72,189 @@ public class QuotaRepairTask {
QuotaRepairTask.class);
private static final int BATCH_SIZE = 5000;
private static final int TASK_THREAD_CNT = 3;
- public static final long EPOCH_DEFAULT = -1L;
- private final OMMetadataManager metadataManager;
- private final Map<String, OmBucketInfo> nameBucketInfoMap = new HashMap<>();
- private final Map<String, OmBucketInfo> idBucketInfoMap = new HashMap<>();
+ private static final AtomicBoolean IN_PROGRESS = new AtomicBoolean(false);
+ private static final RepairStatus REPAIR_STATUS = new RepairStatus();
+ private final OzoneManager om;
+ private final AtomicLong runCount = new AtomicLong(0);
private ExecutorService executor;
- private final Map<String, CountPair> keyCountMap = new ConcurrentHashMap<>();
- private final Map<String, CountPair> fileCountMap
- = new ConcurrentHashMap<>();
- private final Map<String, CountPair> directoryCountMap
- = new ConcurrentHashMap<>();
- private final Map<String, String> oldVolumeKeyNameMap = new HashMap();
+ public QuotaRepairTask(OzoneManager ozoneManager) {
+ this.om = ozoneManager;
+ }
- public QuotaRepairTask(OMMetadataManager metadataManager) {
- this.metadataManager = metadataManager;
+ public CompletableFuture<Boolean> repair() throws Exception {
+ // lock in progress operation and reject any other
+ if (!IN_PROGRESS.compareAndSet(false, true)) {
+ LOG.info("quota repair task already running");
+ return CompletableFuture.supplyAsync(() -> false);
+ }
+ REPAIR_STATUS.reset(runCount.get() + 1);
+ return CompletableFuture.supplyAsync(() -> repairTask());
}
-
- public void repair() throws Exception {
- LOG.info("Starting quota repair task");
- prepareAllVolumeBucketInfo();
- IOzoneManagerLock lock = metadataManager.getLock();
- // thread pool with 3 Table type * (1 task each + 3 thread each)
- executor = Executors.newFixedThreadPool(12);
+ public static String getStatus() {
+ return REPAIR_STATUS.toString();
+ }
+ private boolean repairTask() {
+ LOG.info("Starting quota repair task {}", REPAIR_STATUS);
+ OMMetadataManager activeMetaManager = null;
try {
- nameBucketInfoMap.values().stream().forEach(e -> lock.acquireReadLock(
- BUCKET_LOCK, e.getVolumeName(), e.getBucketName()));
- repairCount();
+ // thread pool with 3 Table type * (1 task each + 3 thread for each task)
+ executor = Executors.newFixedThreadPool(3 * (1 + TASK_THREAD_CNT));
+ OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder
+ = OzoneManagerProtocolProtos.QuotaRepairRequest.newBuilder();
+ // repair active db
+ activeMetaManager = createActiveDBCheckpoint(om.getMetadataManager(),
om.getConfiguration());
+ repairActiveDb(activeMetaManager, builder);
+
+ // TODO: repair snapshots for quota
+
+ // submit request to update
+ ClientId clientId = ClientId.randomId();
+ OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.QuotaRepair)
+ .setQuotaRepairRequest(builder.build())
+ .setClientId(clientId.toString())
+ .build();
+ OzoneManagerProtocolProtos.OMResponse response =
submitRequest(omRequest, clientId);
+ if (response != null && !response.getSuccess()) {
+ LOG.error("update quota repair count response failed");
+ REPAIR_STATUS.updateStatus("Response for update DB is failed");
+ return false;
+ } else {
+ REPAIR_STATUS.updateStatus(builder, om.getMetadataManager());
+ }
+ } catch (Exception exp) {
+ LOG.error("quota repair count failed", exp);
+ REPAIR_STATUS.updateStatus(exp.toString());
+ return false;
} finally {
- nameBucketInfoMap.values().stream().forEach(e -> lock.releaseReadLock(
- BUCKET_LOCK, e.getVolumeName(), e.getBucketName()));
+ LOG.info("Completed quota repair task {}", REPAIR_STATUS);
executor.shutdown();
- LOG.info("Completed quota repair task");
+ try {
+ if (null != activeMetaManager) {
+ activeMetaManager.stop();
+ }
+ cleanTempCheckPointPath(om.getMetadataManager());
+ } catch (Exception exp) {
+ LOG.error("failed to cleanup", exp);
+ }
+ IN_PROGRESS.set(false);
}
- updateOldVolumeQuotaSupport();
-
- // cleanup epoch added to avoid extra epoch id in cache
- ArrayList<Long> epochs = new ArrayList<>();
- epochs.add(EPOCH_DEFAULT);
- metadataManager.getBucketTable().cleanupCache(epochs);
- metadataManager.getVolumeTable().cleanupCache(epochs);
+ return true;
}
-
- private void prepareAllVolumeBucketInfo() throws IOException {
- try (TableIterator<String, ? extends Table.KeyValue<String, OmVolumeArgs>>
- iterator = metadataManager.getVolumeTable().iterator()) {
- OmVolumeArgs omVolumeArgs;
- while (iterator.hasNext()) {
- Table.KeyValue<String, OmVolumeArgs> entry =
- iterator.next();
- omVolumeArgs = entry.getValue();
- getAllBuckets(omVolumeArgs.getVolume(), omVolumeArgs.getObjectID());
- if (omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT
- || omVolumeArgs.getQuotaInNamespace() == OLD_QUOTA_DEFAULT) {
- oldVolumeKeyNameMap.put(entry.getKey(),
entry.getValue().getVolume());
- }
+ private void repairActiveDb(
+ OMMetadataManager metadataManager,
+ OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder) throws
Exception {
+ Map<String, OmBucketInfo> nameBucketInfoMap = new HashMap<>();
+ Map<String, OmBucketInfo> idBucketInfoMap = new HashMap<>();
+ Map<String, OmBucketInfo> oriBucketInfoMap = new HashMap<>();
+ prepareAllBucketInfo(nameBucketInfoMap, idBucketInfoMap, oriBucketInfoMap,
metadataManager);
+
+ repairCount(nameBucketInfoMap, idBucketInfoMap, metadataManager);
+
+ // prepare and submit request to ratis
+ for (Map.Entry<String, OmBucketInfo> entry : nameBucketInfoMap.entrySet())
{
+ OmBucketInfo oriBucketInfo = oriBucketInfoMap.get(entry.getKey());
+ OmBucketInfo updatedBuckedInfo = entry.getValue();
+ boolean oldQuota = oriBucketInfo.getQuotaInBytes() == OLD_QUOTA_DEFAULT
+ || oriBucketInfo.getQuotaInNamespace() == OLD_QUOTA_DEFAULT;
+ if (!(oldQuota || isChange(oriBucketInfo, updatedBuckedInfo))) {
+ continue;
}
+ OzoneManagerProtocolProtos.BucketQuotaCount.Builder bucketCountBuilder
+ = OzoneManagerProtocolProtos.BucketQuotaCount.newBuilder();
+ bucketCountBuilder.setVolName(updatedBuckedInfo.getVolumeName());
+ bucketCountBuilder.setBucketName(updatedBuckedInfo.getBucketName());
+ bucketCountBuilder.setDiffUsedBytes(updatedBuckedInfo.getUsedBytes() -
oriBucketInfo.getUsedBytes());
+ bucketCountBuilder.setDiffUsedNamespace(
+ updatedBuckedInfo.getUsedNamespace() -
oriBucketInfo.getUsedNamespace());
+ bucketCountBuilder.setSupportOldQuota(oldQuota);
+ builder.addBucketCount(bucketCountBuilder.build());
}
+
+ // update volume to support quota
+ builder.setSupportVolumeOldQuota(true);
}
- private void updateOldVolumeQuotaSupport() throws IOException {
- LOG.info("Starting volume quota support update");
- IOzoneManagerLock lock = metadataManager.getLock();
- try (BatchOperation batchOperation = metadataManager.getStore()
- .initBatchOperation()) {
- for (Map.Entry<String, String> volEntry
- : oldVolumeKeyNameMap.entrySet()) {
- lock.acquireReadLock(VOLUME_LOCK, volEntry.getValue());
- try {
- OmVolumeArgs omVolumeArgs = metadataManager.getVolumeTable().get(
- volEntry.getKey());
- boolean isQuotaReset = false;
- if (omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT) {
- omVolumeArgs.setQuotaInBytes(QUOTA_RESET);
- isQuotaReset = true;
- }
- if (omVolumeArgs.getQuotaInNamespace() == OLD_QUOTA_DEFAULT) {
- omVolumeArgs.setQuotaInNamespace(QUOTA_RESET);
- isQuotaReset = true;
- }
- if (isQuotaReset) {
- metadataManager.getVolumeTable().addCacheEntry(
- new CacheKey<>(volEntry.getKey()),
- CacheValue.get(EPOCH_DEFAULT, omVolumeArgs));
- metadataManager.getVolumeTable().putWithBatch(batchOperation,
- volEntry.getKey(), omVolumeArgs);
- }
- } finally {
- lock.releaseReadLock(VOLUME_LOCK, volEntry.getValue());
- }
+ private OzoneManagerProtocolProtos.OMResponse submitRequest(
+ OzoneManagerProtocolProtos.OMRequest omRequest, ClientId clientId) {
+ try {
+ if (om.isRatisEnabled()) {
+ OzoneManagerRatisServer server = om.getOmRatisServer();
+ RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
+ .setClientId(clientId)
+ .setServerId(om.getOmRatisServer().getRaftPeerId())
+ .setGroupId(om.getOmRatisServer().getRaftGroupId())
+ .setCallId(runCount.getAndIncrement())
+
.setMessage(Message.valueOf(OMRatisHelper.convertRequestToByteString(omRequest)))
+ .setType(RaftClientRequest.writeRequestType())
+ .build();
+ return server.submitRequest(omRequest, raftClientRequest);
+ } else {
+ return om.getOmServerProtocol().submitRequest(
+ null, omRequest);
}
- metadataManager.getStore().commitBatchOperation(batchOperation);
+ } catch (ServiceException e) {
+ LOG.error("repair quota count " + omRequest.getCmdType() + " request
failed.", e);
}
- LOG.info("Completed volume quota support update");
+ return null;
}
- private void getAllBuckets(String volumeName, long volumeId)
- throws IOException {
- List<OmBucketInfo> bucketList = metadataManager.listBuckets(
- volumeName, null, null, Integer.MAX_VALUE, false);
- for (OmBucketInfo bucketInfo : bucketList) {
- bucketInfo.incrUsedNamespace(-bucketInfo.getUsedNamespace());
- bucketInfo.incrUsedBytes(-bucketInfo.getUsedBytes());
- nameBucketInfoMap.put(buildNamePath(volumeName,
- bucketInfo.getBucketName()), bucketInfo);
- idBucketInfoMap.put(buildIdPath(volumeId, bucketInfo.getObjectID()),
- bucketInfo);
+ private OMMetadataManager createActiveDBCheckpoint(
+ OMMetadataManager omMetaManager, OzoneConfiguration conf) throws
IOException {
+ // cleanup
+ String parentPath = cleanTempCheckPointPath(omMetaManager);
+
+ // create snapshot
+ DBCheckpoint checkpoint =
omMetaManager.getStore().getCheckpoint(parentPath, true);
+ return OmMetadataManagerImpl.createCheckpointMetadataManager(conf,
checkpoint);
+ }
+
+ private static String cleanTempCheckPointPath(OMMetadataManager
omMetaManager) throws IOException {
+ File dbLocation = omMetaManager.getStore().getDbLocation();
+ if (dbLocation == null) {
+ throw new NullPointerException("db location is null");
+ }
+ String tempData = dbLocation.getParent();
+ if (tempData == null) {
+ throw new NullPointerException("parent db dir is null");
+ }
+ File repairTmpPath = Paths.get(tempData, "temp-repair-quota").toFile();
+ FileUtils.deleteDirectory(repairTmpPath);
+ FileUtils.forceMkdir(repairTmpPath);
+ return repairTmpPath.toString();
+ }
+
+ private void prepareAllBucketInfo(
+ Map<String, OmBucketInfo> nameBucketInfoMap, Map<String, OmBucketInfo>
idBucketInfoMap,
+ Map<String, OmBucketInfo> oriBucketInfoMap, OMMetadataManager
metadataManager) throws IOException {
+ try (TableIterator<String, ? extends Table.KeyValue<String, OmBucketInfo>>
+ iterator = metadataManager.getBucketTable().iterator()) {
+ while (iterator.hasNext()) {
+ Table.KeyValue<String, OmBucketInfo> entry = iterator.next();
+ OmBucketInfo bucketInfo = entry.getValue();
+ String bucketNameKey = buildNamePath(bucketInfo.getVolumeName(),
+ bucketInfo.getBucketName());
+ oriBucketInfoMap.put(bucketNameKey, bucketInfo.copyObject());
+ bucketInfo.incrUsedNamespace(-bucketInfo.getUsedNamespace());
+ bucketInfo.incrUsedBytes(-bucketInfo.getUsedBytes());
+ nameBucketInfoMap.put(bucketNameKey, bucketInfo);
+
idBucketInfoMap.put(buildIdPath(metadataManager.getVolumeId(bucketInfo.getVolumeName()),
+ bucketInfo.getObjectID()), bucketInfo);
+ }
+ }
+ }
+
+ private boolean isChange(OmBucketInfo lBucketInfo, OmBucketInfo rBucketInfo)
{
+ if (lBucketInfo.getUsedNamespace() != rBucketInfo.getUsedNamespace()
+ || lBucketInfo.getUsedBytes() != rBucketInfo.getUsedBytes()) {
+ return true;
}
+ return false;
}
- private String buildNamePath(String volumeName, String bucketName) {
+ private static String buildNamePath(String volumeName, String bucketName) {
final StringBuilder builder = new StringBuilder();
builder.append(OM_KEY_PREFIX)
.append(volumeName)
@@ -184,7 +264,7 @@ public class QuotaRepairTask {
return builder.toString();
}
- private String buildIdPath(long volumeId, long bucketId) {
+ private static String buildIdPath(long volumeId, long bucketId) {
final StringBuilder builder = new StringBuilder();
builder.append(OM_KEY_PREFIX)
.append(volumeId)
@@ -194,8 +274,13 @@ public class QuotaRepairTask {
return builder.toString();
}
- private void repairCount() throws Exception {
- LOG.info("Starting quota repair for all keys, files and directories");
+ private void repairCount(
+ Map<String, OmBucketInfo> nameBucketInfoMap, Map<String, OmBucketInfo>
idBucketInfoMap,
+ OMMetadataManager metadataManager) throws Exception {
+ LOG.info("Starting quota repair counting for all keys, files and
directories");
+ Map<String, CountPair> keyCountMap = new ConcurrentHashMap<>();
+ Map<String, CountPair> fileCountMap = new ConcurrentHashMap<>();
+ Map<String, CountPair> directoryCountMap = new ConcurrentHashMap<>();
try {
nameBucketInfoMap.keySet().stream().forEach(e -> keyCountMap.put(e,
new CountPair()));
@@ -225,51 +310,11 @@ public class QuotaRepairTask {
throw new Exception(ex.getCause());
}
- // persist bucket info
+ // update count to bucket info
updateCountToBucketInfo(nameBucketInfoMap, keyCountMap);
updateCountToBucketInfo(idBucketInfoMap, fileCountMap);
updateCountToBucketInfo(idBucketInfoMap, directoryCountMap);
-
- // update quota enable flag for old volume and buckets
- updateOldBucketQuotaSupport();
-
- try (BatchOperation batchOperation = metadataManager.getStore()
- .initBatchOperation()) {
- for (Map.Entry<String, OmBucketInfo> entry
- : nameBucketInfoMap.entrySet()) {
- String bucketKey = metadataManager.getBucketKey(
- entry.getValue().getVolumeName(),
- entry.getValue().getBucketName());
- metadataManager.getBucketTable().putWithBatch(batchOperation,
- bucketKey, entry.getValue());
- }
- metadataManager.getStore().commitBatchOperation(batchOperation);
- }
- LOG.info("Completed quota repair for all keys, files and directories");
- }
-
- private void updateOldBucketQuotaSupport() {
- for (Map.Entry<String, OmBucketInfo> entry : nameBucketInfoMap.entrySet())
{
- if (entry.getValue().getQuotaInBytes() == OLD_QUOTA_DEFAULT
- || entry.getValue().getQuotaInNamespace() == OLD_QUOTA_DEFAULT) {
- OmBucketInfo.Builder builder = entry.getValue().toBuilder();
- if (entry.getValue().getQuotaInBytes() == OLD_QUOTA_DEFAULT) {
- builder.setQuotaInBytes(QUOTA_RESET);
- }
- if (entry.getValue().getQuotaInNamespace() == OLD_QUOTA_DEFAULT) {
- builder.setQuotaInNamespace(QUOTA_RESET);
- }
- OmBucketInfo bucketInfo = builder.build();
- entry.setValue(bucketInfo);
-
- // there is a new value to be updated in bucket cache
- String bucketKey = metadataManager.getBucketKey(
- bucketInfo.getVolumeName(), bucketInfo.getBucketName());
- metadataManager.getBucketTable().addCacheEntry(
- new CacheKey<>(bucketKey),
- CacheValue.get(EPOCH_DEFAULT, bucketInfo));
- }
- }
+ LOG.info("Completed quota repair counting for all keys, files and
directories");
}
private <VALUE> void recalculateUsages(
@@ -315,7 +360,7 @@ public class QuotaRepairTask {
}
}
- private <VALUE> void captureCount(
+ private static <VALUE> void captureCount(
Map<String, CountPair> prefixUsageMap,
BlockingQueue<List<Table.KeyValue<String, VALUE>>> q,
AtomicBoolean isRunning, boolean haveValue) throws UncheckedIOException {
@@ -334,7 +379,7 @@ public class QuotaRepairTask {
}
}
- private <VALUE> void extractCount(
+ private static <VALUE> void extractCount(
Table.KeyValue<String, VALUE> kv,
Map<String, CountPair> prefixUsageMap,
boolean haveValue) {
@@ -357,7 +402,7 @@ public class QuotaRepairTask {
}
}
- private synchronized void updateCountToBucketInfo(
+ private static synchronized void updateCountToBucketInfo(
Map<String, OmBucketInfo> bucketInfoMap,
Map<String, CountPair> prefixUsageMap) {
for (Map.Entry<String, CountPair> entry : prefixUsageMap.entrySet()) {
@@ -370,7 +415,7 @@ public class QuotaRepairTask {
}
}
- private String getVolumeBucketPrefix(String key) {
+ private static String getVolumeBucketPrefix(String key) {
// get bucket prefix with /<volume>/<bucket>/
// -- as represents name in OBS and id in FSO
String prefix = key;
@@ -404,4 +449,66 @@ public class QuotaRepairTask {
return namespace.get();
}
}
+
+ /**
+ * Repair status for last run.
+ */
+ public static class RepairStatus {
+ private boolean isTriggered = false;
+ private long taskId = 0;
+ private long lastRunStartTime = 0;
+ private long lastRunFinishedTime = 0;
+ private String errorMsg = null;
+ private Map<String, Map<String, Long>> bucketCountDiffMap = new
ConcurrentHashMap<>();
+
+ @Override
+ public String toString() {
+ if (!isTriggered) {
+ return "{}";
+ }
+ Map<String, Object> status = new HashMap<>();
+ status.put("taskId", taskId);
+ status.put("lastRunStartTime", lastRunStartTime);
+ status.put("lastRunFinishedTime", lastRunFinishedTime);
+ status.put("errorMsg", errorMsg);
+ status.put("bucketCountDiffMap", bucketCountDiffMap);
+ try {
+ return new ObjectMapper().writeValueAsString(status);
+ } catch (IOException e) {
+ LOG.error("error in generating status", e);
+ return "{}";
+ }
+ }
+
+ public void
updateStatus(OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder,
+ OMMetadataManager metadataManager) {
+ isTriggered = true;
+ lastRunFinishedTime = System.currentTimeMillis();
+ errorMsg = "";
+ bucketCountDiffMap.clear();
+ for (OzoneManagerProtocolProtos.BucketQuotaCount quotaCount :
builder.getBucketCountList()) {
+ String bucketKey =
metadataManager.getBucketKey(quotaCount.getVolName(),
quotaCount.getBucketName());
+ ConcurrentHashMap<String, Long> diffCountMap = new
ConcurrentHashMap<>();
+ diffCountMap.put("DiffUsedBytes", quotaCount.getDiffUsedBytes());
+ diffCountMap.put("DiffUsedNamespace",
quotaCount.getDiffUsedNamespace());
+ bucketCountDiffMap.put(bucketKey, diffCountMap);
+ }
+ }
+
+ public void updateStatus(String errMsg) {
+ isTriggered = true;
+ lastRunFinishedTime = System.currentTimeMillis();
+ errorMsg = errMsg;
+ bucketCountDiffMap.clear();
+ }
+
+ public void reset(long tskId) {
+ isTriggered = true;
+ taskId = tskId;
+ lastRunStartTime = System.currentTimeMillis();
+ lastRunFinishedTime = 0;
+ errorMsg = "";
+ bucketCountDiffMap.clear();
+ }
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java
index 8a2e4f550e..446c7382d5 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java
@@ -21,26 +21,39 @@ package org.apache.hadoop.ozone.om.upgrade;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
+import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.service.QuotaRepairTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.QUOTA;
-import static
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FIRST_UPGRADE_START;
+import static
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE;
/**
- * Quota repair for usages action to be triggered during first upgrade.
+ * Quota repair for usages action to be triggered after upgrade.
*/
-@UpgradeActionOm(type = ON_FIRST_UPGRADE_START, feature =
- QUOTA)
+@UpgradeActionOm(type = ON_FINALIZE, feature = QUOTA)
public class QuotaRepairUpgradeAction implements OmUpgradeAction {
+ private static final Logger LOG =
LoggerFactory.getLogger(QuotaRepairUpgradeAction.class);
@Override
public void execute(OzoneManager arg) throws Exception {
boolean enabled = arg.getConfiguration().getBoolean(
OMConfigKeys.OZONE_OM_UPGRADE_QUOTA_RECALCULATE_ENABLE,
OMConfigKeys.OZONE_OM_UPGRADE_QUOTA_RECALCULATE_ENABLE_DEFAULT);
if (enabled) {
- QuotaRepairTask quotaRepairTask = new QuotaRepairTask(
- arg.getMetadataManager());
- quotaRepairTask.repair();
+ // just trigger quota repair and status can be checked via CLI
+ try {
+ if (arg.isRatisEnabled()) {
+ arg.checkLeaderStatus();
+ }
+ QuotaRepairTask quotaRepairTask = new QuotaRepairTask(arg);
+ quotaRepairTask.repair();
+ } catch (OMNotLeaderException | OMLeaderNotReadyException ex) {
+ // on leader node, repair will be triggered where finalize is called.
For other nodes, it will be ignored.
+ // This can be triggered on need basis via CLI tool.
+ LOG.warn("Skip quota repair operation during upgrade on the node as
this is not a leader node.");
+ }
}
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java
index 1a0db11833..06b8beacb3 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java
@@ -22,8 +22,16 @@ package org.apache.hadoop.ozone.om.service;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static org.junit.jupiter.api.Assertions.assertEquals;
-
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -32,6 +40,11 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
import org.apache.hadoop.ozone.om.request.key.TestOMKeyRequest;
+import org.apache.hadoop.ozone.om.request.volume.OMQuotaRepairRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.volume.OMQuotaRepairResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
import org.apache.hadoop.util.Time;
import org.junit.jupiter.api.Test;
@@ -44,6 +57,16 @@ public class TestQuotaRepairTask extends TestOMKeyRequest {
@Test
public void testQuotaRepair() throws Exception {
+ when(ozoneManager.isRatisEnabled()).thenReturn(false);
+ OzoneManagerProtocolProtos.OMResponse respMock =
mock(OzoneManagerProtocolProtos.OMResponse.class);
+ when(respMock.getSuccess()).thenReturn(true);
+ OzoneManagerProtocolServerSideTranslatorPB serverMock =
mock(OzoneManagerProtocolServerSideTranslatorPB.class);
+ AtomicReference<OzoneManagerProtocolProtos.OMRequest> ref = new
AtomicReference<>();
+ doAnswer(invocation -> {
+ ref.set(invocation.getArgument(1,
OzoneManagerProtocolProtos.OMRequest.class));
+ return respMock;
+ }).when(serverMock).submitRequest(any(), any());
+ when(ozoneManager.getOmServerProtocol()).thenReturn(serverMock);
OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager, BucketLayout.OBJECT_STORE);
@@ -88,9 +111,16 @@ public class TestQuotaRepairTask extends TestOMKeyRequest {
assertEquals(0, fsoBucketInfo.getUsedNamespace());
assertEquals(0, fsoBucketInfo.getUsedBytes());
- QuotaRepairTask quotaRepairTask = new QuotaRepairTask(omMetadataManager);
- quotaRepairTask.repair();
-
+ QuotaRepairTask quotaRepairTask = new QuotaRepairTask(ozoneManager);
+ CompletableFuture<Boolean> repair = quotaRepairTask.repair();
+ Boolean repairStatus = repair.get();
+ assertTrue(repairStatus);
+
+ OMQuotaRepairRequest omQuotaRepairRequest = new
OMQuotaRepairRequest(ref.get());
+ OMClientResponse omClientResponse =
omQuotaRepairRequest.validateAndUpdateCache(ozoneManager, 1);
+ BatchOperation batchOperation =
omMetadataManager.getStore().initBatchOperation();
+ ((OMQuotaRepairResponse)omClientResponse).addToDBBatch(omMetadataManager,
batchOperation);
+ omMetadataManager.getStore().commitBatchOperation(batchOperation);
// 10 files of each type, obs have replication of three and
// fso have replication of one
OmBucketInfo obsUpdateBucketInfo = omMetadataManager.getBucketTable().get(
@@ -105,6 +135,16 @@ public class TestQuotaRepairTask extends TestOMKeyRequest {
@Test
public void testQuotaRepairForOldVersionVolumeBucket() throws Exception {
+ when(ozoneManager.isRatisEnabled()).thenReturn(false);
+ OzoneManagerProtocolProtos.OMResponse respMock =
mock(OzoneManagerProtocolProtos.OMResponse.class);
+ when(respMock.getSuccess()).thenReturn(true);
+ OzoneManagerProtocolServerSideTranslatorPB serverMock =
mock(OzoneManagerProtocolServerSideTranslatorPB.class);
+ AtomicReference<OzoneManagerProtocolProtos.OMRequest> ref = new
AtomicReference<>();
+ doAnswer(invocation -> {
+ ref.set(invocation.getArgument(1,
OzoneManagerProtocolProtos.OMRequest.class));
+ return respMock;
+ }).when(serverMock).submitRequest(any(), any());
+ when(ozoneManager.getOmServerProtocol()).thenReturn(serverMock);
// add volume with -2 value
OmVolumeArgs omVolumeArgs =
OmVolumeArgs.newBuilder().setCreationTime(Time.now())
@@ -117,13 +157,14 @@ public class TestQuotaRepairTask extends TestOMKeyRequest
{
new CacheKey<>(omMetadataManager.getVolumeKey(volumeName)),
CacheValue.get(1L, omVolumeArgs));
- // add bucket with -2 value
+ // add bucket with -2 value and add to db
OMRequestTestUtils.addBucketToDB(volumeName, bucketName,
omMetadataManager, -2);
+ String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+ omMetadataManager.getBucketTable().put(bucketKey,
omMetadataManager.getBucketTable().get(bucketKey));
// pre check for quota flag
- OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
- omMetadataManager.getBucketKey(volumeName, bucketName));
+ OmBucketInfo bucketInfo =
omMetadataManager.getBucketTable().get(bucketKey);
assertEquals(-2, bucketInfo.getQuotaInBytes());
omVolumeArgs = omMetadataManager.getVolumeTable().get(
@@ -131,11 +172,18 @@ public class TestQuotaRepairTask extends TestOMKeyRequest
{
assertEquals(-2, omVolumeArgs.getQuotaInBytes());
assertEquals(-2, omVolumeArgs.getQuotaInNamespace());
- QuotaRepairTask quotaRepairTask = new QuotaRepairTask(omMetadataManager);
- quotaRepairTask.repair();
+ QuotaRepairTask quotaRepairTask = new QuotaRepairTask(ozoneManager);
+ CompletableFuture<Boolean> repair = quotaRepairTask.repair();
+ Boolean repairStatus = repair.get();
+ assertTrue(repairStatus);
+ OMQuotaRepairRequest omQuotaRepairRequest = new
OMQuotaRepairRequest(ref.get());
+ OMClientResponse omClientResponse =
omQuotaRepairRequest.validateAndUpdateCache(ozoneManager, 1);
+ BatchOperation batchOperation =
omMetadataManager.getStore().initBatchOperation();
+ ((OMQuotaRepairResponse)omClientResponse).addToDBBatch(omMetadataManager,
batchOperation);
+ omMetadataManager.getStore().commitBatchOperation(batchOperation);
bucketInfo = omMetadataManager.getBucketTable().get(
- omMetadataManager.getBucketKey(volumeName, bucketName));
+ bucketKey);
assertEquals(-1, bucketInfo.getQuotaInBytes());
OmVolumeArgs volArgsVerify = omMetadataManager.getVolumeTable()
.get(omMetadataManager.getVolumeKey(volumeName));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]