ChenSammi commented on code in PR #7035:
URL: https://github.com/apache/ozone/pull/7035#discussion_r1716434193
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java:
##########
@@ -64,117 +72,188 @@ public class QuotaRepairTask {
QuotaRepairTask.class);
private static final int BATCH_SIZE = 5000;
private static final int TASK_THREAD_CNT = 3;
- public static final long EPOCH_DEFAULT = -1L;
- private final OMMetadataManager metadataManager;
- private final Map<String, OmBucketInfo> nameBucketInfoMap = new HashMap<>();
- private final Map<String, OmBucketInfo> idBucketInfoMap = new HashMap<>();
+ private static final AtomicBoolean IN_PROGRESS = new AtomicBoolean(false);
+ private static final RepairStatus REPAIR_STATUS = new RepairStatus();
+ private final OzoneManager om;
+ private final AtomicLong runCount = new AtomicLong(0);
private ExecutorService executor;
- private final Map<String, CountPair> keyCountMap = new ConcurrentHashMap<>();
- private final Map<String, CountPair> fileCountMap
- = new ConcurrentHashMap<>();
- private final Map<String, CountPair> directoryCountMap
- = new ConcurrentHashMap<>();
- private final Map<String, String> oldVolumeKeyNameMap = new HashMap();
+ public QuotaRepairTask(OzoneManager ozoneManager) {
+ this.om = ozoneManager;
+ }
- public QuotaRepairTask(OMMetadataManager metadataManager) {
- this.metadataManager = metadataManager;
+ public CompletableFuture<Boolean> repair() throws Exception {
+ // lock in progress operation and reject any other
+ if (!IN_PROGRESS.compareAndSet(false, true)) {
+ LOG.info("quota repair task already running");
+ return CompletableFuture.supplyAsync(() -> false);
+ }
+ REPAIR_STATUS.reset(runCount.get() + 1);
+ return CompletableFuture.supplyAsync(() -> repairTask());
}
-
- public void repair() throws Exception {
- LOG.info("Starting quota repair task");
- prepareAllVolumeBucketInfo();
- IOzoneManagerLock lock = metadataManager.getLock();
- // thread pool with 3 Table type * (1 task each + 3 thread each)
- executor = Executors.newFixedThreadPool(12);
+ public static String getStatus() {
+ return REPAIR_STATUS.toString();
+ }
+ private boolean repairTask() {
+ LOG.info("Starting quota repair task {}", REPAIR_STATUS);
+ OMMetadataManager activeMetaManager = null;
try {
- nameBucketInfoMap.values().stream().forEach(e -> lock.acquireReadLock(
- BUCKET_LOCK, e.getVolumeName(), e.getBucketName()));
- repairCount();
+ // thread pool with 3 Table type * (1 task each + 3 thread each)
+ executor = Executors.newFixedThreadPool(12);
+ OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder
+ = OzoneManagerProtocolProtos.QuotaRepairRequest.newBuilder();
+ // repair active db
+ activeMetaManager = createActiveDBCheckpoint(om.getMetadataManager(),
om.getConfiguration());
+ repairActiveDb(activeMetaManager, builder);
+
+ // TODO: repair snapshots for quota
+
+ // submit request to update
+ ClientId clientId = ClientId.randomId();
+ OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.QuotaRepair)
+ .setQuotaRepairRequest(builder.build())
+ .setClientId(clientId.toString())
+ .build();
+ OzoneManagerProtocolProtos.OMResponse response =
submitRequest(omRequest, clientId);
+ if (response != null && response.getSuccess()) {
+ LOG.error("update quota repair count response failed");
+ REPAIR_STATUS.updateStatus("Response for update DB is failed");
+ } else {
+ REPAIR_STATUS.updateStatus(builder, om.getMetadataManager());
+ }
+ } catch (Exception exp) {
+ LOG.error("quota repair count failed", exp);
+ REPAIR_STATUS.updateStatus(exp.toString());
+ return false;
} finally {
- nameBucketInfoMap.values().stream().forEach(e -> lock.releaseReadLock(
- BUCKET_LOCK, e.getVolumeName(), e.getBucketName()));
+ LOG.info("Completed quota repair task {}", REPAIR_STATUS);
executor.shutdown();
- LOG.info("Completed quota repair task");
+ try {
+ if (null != activeMetaManager) {
+ activeMetaManager.stop();
+ }
+ cleanTempCheckPointPath(om.getMetadataManager());
+ } catch (Exception exp) {
+ LOG.error("failed to cleanup", exp);
+ }
+ IN_PROGRESS.set(false);
}
- updateOldVolumeQuotaSupport();
-
- // cleanup epoch added to avoid extra epoch id in cache
- ArrayList<Long> epochs = new ArrayList<>();
- epochs.add(EPOCH_DEFAULT);
- metadataManager.getBucketTable().cleanupCache(epochs);
- metadataManager.getVolumeTable().cleanupCache(epochs);
+ return true;
}
-
- private void prepareAllVolumeBucketInfo() throws IOException {
- try (TableIterator<String, ? extends Table.KeyValue<String, OmVolumeArgs>>
- iterator = metadataManager.getVolumeTable().iterator()) {
- OmVolumeArgs omVolumeArgs;
- while (iterator.hasNext()) {
- Table.KeyValue<String, OmVolumeArgs> entry =
- iterator.next();
- omVolumeArgs = entry.getValue();
- getAllBuckets(omVolumeArgs.getVolume(), omVolumeArgs.getObjectID());
- if (omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT
- || omVolumeArgs.getQuotaInNamespace() == OLD_QUOTA_DEFAULT) {
- oldVolumeKeyNameMap.put(entry.getKey(),
entry.getValue().getVolume());
- }
+ private void repairActiveDb(
+ OMMetadataManager metadataManager,
+ OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder) throws
Exception {
+ Map<String, OmBucketInfo> nameBucketInfoMap = new HashMap<>();
+ Map<String, OmBucketInfo> idBucketInfoMap = new HashMap<>();
+ Map<String, OmBucketInfo> oriBucketInfoMap = new HashMap<>();
+ prepareAllBucketInfo(nameBucketInfoMap, idBucketInfoMap, oriBucketInfoMap,
metadataManager);
+
+ repairCount(nameBucketInfoMap, idBucketInfoMap, metadataManager);
+
+ // prepare and submit request to ratis
+ for (Map.Entry<String, OmBucketInfo> entry : nameBucketInfoMap.entrySet())
{
+ OmBucketInfo oriBucketInfo = oriBucketInfoMap.get(entry.getKey());
+ OmBucketInfo updatedBuckedInfo = entry.getValue();
+ boolean oldQuota = oriBucketInfo.getQuotaInBytes() == OLD_QUOTA_DEFAULT
+ || oriBucketInfo.getQuotaInNamespace() == OLD_QUOTA_DEFAULT;
+ if (!(oldQuota || isChange(oriBucketInfo, updatedBuckedInfo))) {
+ continue;
}
+ OzoneManagerProtocolProtos.BucketQuotaCount.Builder bucketCountBuilder
+ = OzoneManagerProtocolProtos.BucketQuotaCount.newBuilder();
+ bucketCountBuilder.setVolName(updatedBuckedInfo.getVolumeName());
+ bucketCountBuilder.setBucketName(updatedBuckedInfo.getBucketName());
+ bucketCountBuilder.setDiffUsedBytes(updatedBuckedInfo.getUsedBytes() -
oriBucketInfo.getUsedBytes());
+ bucketCountBuilder.setDiffUsedNamespace(
+ updatedBuckedInfo.getUsedNamespace() -
oriBucketInfo.getUsedNamespace());
+ bucketCountBuilder.setSupportOldQuota(oldQuota);
+ builder.addBucketCount(bucketCountBuilder.build());
}
+
+ // update volume to support quota
+ builder.setSupportVolumeOldQuota(true);
Review Comment:
If it's always true, then suggest to remove this flag in the proto
definition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]