ChenSammi commented on code in PR #7035:
URL: https://github.com/apache/ozone/pull/7035#discussion_r1716429795
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java:
##########
@@ -64,117 +72,188 @@ public class QuotaRepairTask {
QuotaRepairTask.class);
private static final int BATCH_SIZE = 5000;
private static final int TASK_THREAD_CNT = 3;
- public static final long EPOCH_DEFAULT = -1L;
- private final OMMetadataManager metadataManager;
- private final Map<String, OmBucketInfo> nameBucketInfoMap = new HashMap<>();
- private final Map<String, OmBucketInfo> idBucketInfoMap = new HashMap<>();
+ private static final AtomicBoolean IN_PROGRESS = new AtomicBoolean(false);
+ private static final RepairStatus REPAIR_STATUS = new RepairStatus();
+ private final OzoneManager om;
+ private final AtomicLong runCount = new AtomicLong(0);
private ExecutorService executor;
- private final Map<String, CountPair> keyCountMap = new ConcurrentHashMap<>();
- private final Map<String, CountPair> fileCountMap
- = new ConcurrentHashMap<>();
- private final Map<String, CountPair> directoryCountMap
- = new ConcurrentHashMap<>();
- private final Map<String, String> oldVolumeKeyNameMap = new HashMap();
+ public QuotaRepairTask(OzoneManager ozoneManager) {
+ this.om = ozoneManager;
+ }
- public QuotaRepairTask(OMMetadataManager metadataManager) {
- this.metadataManager = metadataManager;
+ public CompletableFuture<Boolean> repair() throws Exception {
+ // lock in progress operation and reject any other
+ if (!IN_PROGRESS.compareAndSet(false, true)) {
+ LOG.info("quota repair task already running");
+ return CompletableFuture.supplyAsync(() -> false);
+ }
+ REPAIR_STATUS.reset(runCount.get() + 1);
+ return CompletableFuture.supplyAsync(() -> repairTask());
}
-
- public void repair() throws Exception {
- LOG.info("Starting quota repair task");
- prepareAllVolumeBucketInfo();
- IOzoneManagerLock lock = metadataManager.getLock();
- // thread pool with 3 Table type * (1 task each + 3 thread each)
- executor = Executors.newFixedThreadPool(12);
+ public static String getStatus() {
+ return REPAIR_STATUS.toString();
+ }
+ private boolean repairTask() {
+ LOG.info("Starting quota repair task {}", REPAIR_STATUS);
+ OMMetadataManager activeMetaManager = null;
try {
- nameBucketInfoMap.values().stream().forEach(e -> lock.acquireReadLock(
- BUCKET_LOCK, e.getVolumeName(), e.getBucketName()));
- repairCount();
+ // thread pool with 3 Table type * (1 task each + 3 thread each)
+ executor = Executors.newFixedThreadPool(12);
+ OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder
+ = OzoneManagerProtocolProtos.QuotaRepairRequest.newBuilder();
+ // repair active db
+ activeMetaManager = createActiveDBCheckpoint(om.getMetadataManager(),
om.getConfiguration());
+ repairActiveDb(activeMetaManager, builder);
+
+ // TODO: repair snapshots for quota
+
+ // submit request to update
+ ClientId clientId = ClientId.randomId();
+ OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.QuotaRepair)
+ .setQuotaRepairRequest(builder.build())
+ .setClientId(clientId.toString())
+ .build();
+ OzoneManagerProtocolProtos.OMResponse response =
submitRequest(omRequest, clientId);
+ if (response != null && response.getSuccess()) {
Review Comment:
response.getSuccess() - > !response.getSuccess()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]