devmadhuu commented on code in PR #4626:
URL: https://github.com/apache/ozone/pull/4626#discussion_r1221382755


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java:
##########
@@ -225,4 +328,146 @@ protected boolean checkAndCallFlushToDB(
     }
     return true;
   }
+
+  protected boolean writeFlushAndCommitOrphanKeysMetaDataToDB(
+      Map<Long, OrphanKeyMetaData> orphanKeyMetaDataMap, long status) {
+    try {
+      writeOrphanKeysMetaDataToDB(orphanKeyMetaDataMap, status);
+      orphanKeyMetaDataMap.clear();
+    } catch (IOException e) {
+      LOG.error("Unable to write orphan keys meta data in Recon DB.", e);
+      return false;
+    }
+    return true;
+  }
+
+  protected boolean checkOrphanDataAndCallWriteFlushToDB(
+      Map<Long, OrphanKeyMetaData> orphanKeyMetaDataMap, long status) {
+    // if map contains more than entries, flush to DB and clear the map
+    if (null != orphanKeyMetaDataMap && orphanKeyMetaDataMap.size() >=
+        orphanKeysFlushToDBMaxThreshold) {
+      return writeFlushAndCommitOrphanKeysMetaDataToDB(
+          orphanKeyMetaDataMap, status);
+    }
+    return true;
+  }
+
+  protected void deleteOrphanKeysMetaDataFromDB(
+      List<Long> orphanKeysParentIdList) throws IOException {
+    try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) {
+      orphanKeysParentIdList.forEach(parentId -> {
+        try {
+          reconNamespaceSummaryManager.batchDeleteOrphanKeyMetaData(
+              rdbBatchOperation, parentId);
+        } catch (IOException e) {
+          LOG.error(
+              "Unable to delete orphan keys from orphanKeysMetaDataTable " +
+                  "in Recon DB.", e);
+        }
+      });
+      try {
+        reconNamespaceSummaryManager.commitBatchOperation(rdbBatchOperation);
+      } catch (IOException e) {
+        // Logging as Info as we don't want to log as error when any dir not
+        // found in orphan candidate metadata set. This is done to avoid 2
+        // rocks DB operations - check if present and then delete operation.
+        LOG.info("Delete batch unable to delete few entries as dir may not be" 
+
+            " found in orphan candidate metadata set");
+      }
+    }
+  }
+
+  protected boolean batchDeleteAndCommitOrphanKeysMetaDataToDB(
+      List<Long> orphanKeysParentIdList) {
+    try {
+      deleteOrphanKeysMetaDataFromDB(orphanKeysParentIdList);
+      orphanKeysParentIdList.clear();
+    } catch (IOException e) {
+      LOG.error("Unable to delete orphan keys meta data from Recon DB.", e);
+      return false;
+    }
+    return true;
+  }
+
+  protected boolean checkOrphanDataThresholdAndAddToDeleteBatch(
+      List<Long> orphanKeysParentIdList) {
+    // if map contains more than entries, flush to DB and clear the map
+    if (null != orphanKeysParentIdList && orphanKeysParentIdList.size() >=
+        orphanKeysFlushToDBMaxThreshold) {
+      return 
batchDeleteAndCommitOrphanKeysMetaDataToDB(orphanKeysParentIdList);
+    }
+    return true;
+  }
+
+  private <T extends WithParentObjectId> void addOrphanCandidate(
+      T fileDirObjInfo,
+      Map<Long, OrphanKeyMetaData> orphanKeyMetaDataMap,
+      long status,
+      boolean parentExist)
+      throws IOException {
+    if (null != orphanKeyMetaDataMap) {
+      long objectID = fileDirObjInfo.getObjectID();
+      long parentObjectID = fileDirObjInfo.getParentObjectID();
+      if (parentExist) {
+        OrphanKeyMetaData orphanKeyMetaData =
+            orphanKeyMetaDataMap.get(parentObjectID);
+        if (null == orphanKeyMetaData) {
+          orphanKeyMetaData =
+              reconNamespaceSummaryManager.getOrphanKeyMetaData(
+                  parentObjectID);
+        }
+        if (null != orphanKeyMetaData) {
+          Set<Long> objectIds = orphanKeyMetaData.getObjectIds();
+          objectIds.add(objectID);
+          orphanKeyMetaDataMap.put(parentObjectID, orphanKeyMetaData);
+        }
+      } else {
+        Set<Long> objectIds = new HashSet<>();
+        objectIds.add(objectID);
+        OrphanKeyMetaData orphanKeyMetaData =
+            new OrphanKeyMetaData(objectIds, status);
+        orphanKeyMetaDataMap.put(parentObjectID, orphanKeyMetaData);
+      }
+    }
+  }
+
+  protected boolean verifyOrphanParentsForBucket(
+      Set<Long> bucketObjectIdsSet,
+      List<Long> toBeDeletedBucketObjectIdsFromOrphanMap)
+      throws IOException {
+    try (TableIterator<Long, ? extends Table.KeyValue<Long,
+        OrphanKeyMetaData>> orphanKeysMetaDataIter =
+             orphanKeysMetaDataTable.iterator()) {
+      while (orphanKeysMetaDataIter.hasNext()) {
+        Table.KeyValue<Long, OrphanKeyMetaData> keyValue =
+            orphanKeysMetaDataIter.next();
+        Long parentId = keyValue.getKey();
+        if (bucketObjectIdsSet.contains(parentId)) {
+          toBeDeletedBucketObjectIdsFromOrphanMap.add(parentId);
+          if (!checkOrphanDataThresholdAndAddToDeleteBatch(
+              toBeDeletedBucketObjectIdsFromOrphanMap)) {
+            return true;

Review Comment:
   Ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to