sumitagrawl commented on code in PR #4626:
URL: https://github.com/apache/ozone/pull/4626#discussion_r1217913688


##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/LongCodec.java:
##########
@@ -33,8 +33,6 @@ public static LongCodec get() {
     return CODEC;
   }
 
-  private LongCodec() { }

Review Comment:
   Constructor needs private as we do not allow define new codec. Its singleton 
object.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java:
##########
@@ -46,20 +60,32 @@ public class NSSummaryTaskDbEventHandler {
       LoggerFactory.getLogger(NSSummaryTaskDbEventHandler.class);
   private ReconNamespaceSummaryManager reconNamespaceSummaryManager;
   private ReconOMMetadataManager reconOMMetadataManager;
+  private DBStore reconDbStore;

Review Comment:
   can reconDBStore be final?



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/codec/OrphanKeyMetaDataCodec.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.recon.codec;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.ozone.recon.api.types.OrphanKeyMetaData;
+
+import java.io.IOException;
+
+/**
+ * Codec for OrphanKeyMetaDataSet.
+ */
+public class OrphanKeyMetaDataCodec
+    implements Codec<OrphanKeyMetaData> {
+
+  @Override
+  public byte[] toPersistedFormat(OrphanKeyMetaData obj) {

Review Comment:
   Similar to LongCodec, this can also be singleton. Can follow same behavior.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OrphanKeyDetectionTask.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import com.google.inject.Inject;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
+import org.apache.hadoop.ozone.recon.api.types.OrphanKeyMetaData;
+import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
+import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_DIR_TABLE;
+import static 
org.apache.hadoop.ozone.recon.spi.impl.ReconDBDefinition.ORPHAN_KEYS_METADATA;
+
+/**
+ * Task class to iterate over the OM DB and detect the orphan keys metadata.
+ */
+public class OrphanKeyDetectionTask implements ReconOmTask {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OrphanKeyDetectionTask.class);
+  private DBStore reconDbStore;

Review Comment:
   member variable can be defined as final



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java:
##########
@@ -225,4 +328,146 @@ protected boolean checkAndCallFlushToDB(
     }
     return true;
   }
+
+  protected boolean writeFlushAndCommitOrphanKeysMetaDataToDB(
+      Map<Long, OrphanKeyMetaData> orphanKeyMetaDataMap, long status) {
+    try {
+      writeOrphanKeysMetaDataToDB(orphanKeyMetaDataMap, status);
+      orphanKeyMetaDataMap.clear();
+    } catch (IOException e) {
+      LOG.error("Unable to write orphan keys meta data in Recon DB.", e);
+      return false;
+    }
+    return true;
+  }
+
+  protected boolean checkOrphanDataAndCallWriteFlushToDB(
+      Map<Long, OrphanKeyMetaData> orphanKeyMetaDataMap, long status) {
+    // if map contains more than entries, flush to DB and clear the map
+    if (null != orphanKeyMetaDataMap && orphanKeyMetaDataMap.size() >=
+        orphanKeysFlushToDBMaxThreshold) {
+      return writeFlushAndCommitOrphanKeysMetaDataToDB(
+          orphanKeyMetaDataMap, status);
+    }
+    return true;
+  }
+
+  protected void deleteOrphanKeysMetaDataFromDB(
+      List<Long> orphanKeysParentIdList) throws IOException {
+    try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) {
+      orphanKeysParentIdList.forEach(parentId -> {
+        try {
+          reconNamespaceSummaryManager.batchDeleteOrphanKeyMetaData(
+              rdbBatchOperation, parentId);
+        } catch (IOException e) {
+          LOG.error(
+              "Unable to delete orphan keys from orphanKeysMetaDataTable " +
+                  "in Recon DB.", e);
+        }
+      });
+      try {
+        reconNamespaceSummaryManager.commitBatchOperation(rdbBatchOperation);
+      } catch (IOException e) {
+        // Logging as Info as we don't want to log as error when any dir not
+        // found in orphan candidate metadata set. This is done to avoid 2
+        // rocks DB operations - check if present and then delete operation.
+        LOG.info("Delete batch unable to delete few entries as dir may not be" 
+
+            " found in orphan candidate metadata set");
+      }
+    }
+  }
+
+  protected boolean batchDeleteAndCommitOrphanKeysMetaDataToDB(
+      List<Long> orphanKeysParentIdList) {
+    try {
+      deleteOrphanKeysMetaDataFromDB(orphanKeysParentIdList);
+      orphanKeysParentIdList.clear();
+    } catch (IOException e) {
+      LOG.error("Unable to delete orphan keys meta data from Recon DB.", e);
+      return false;
+    }
+    return true;
+  }
+
+  protected boolean checkOrphanDataThresholdAndAddToDeleteBatch(
+      List<Long> orphanKeysParentIdList) {
+    // if map contains more than entries, flush to DB and clear the map
+    if (null != orphanKeysParentIdList && orphanKeysParentIdList.size() >=
+        orphanKeysFlushToDBMaxThreshold) {
+      return 
batchDeleteAndCommitOrphanKeysMetaDataToDB(orphanKeysParentIdList);
+    }
+    return true;
+  }
+
+  private <T extends WithParentObjectId> void addOrphanCandidate(
+      T fileDirObjInfo,
+      Map<Long, OrphanKeyMetaData> orphanKeyMetaDataMap,
+      long status,
+      boolean parentExist)
+      throws IOException {
+    if (null != orphanKeyMetaDataMap) {
+      long objectID = fileDirObjInfo.getObjectID();
+      long parentObjectID = fileDirObjInfo.getParentObjectID();
+      if (parentExist) {
+        OrphanKeyMetaData orphanKeyMetaData =
+            orphanKeyMetaDataMap.get(parentObjectID);
+        if (null == orphanKeyMetaData) {
+          orphanKeyMetaData =
+              reconNamespaceSummaryManager.getOrphanKeyMetaData(
+                  parentObjectID);
+        }
+        if (null != orphanKeyMetaData) {
+          Set<Long> objectIds = orphanKeyMetaData.getObjectIds();
+          objectIds.add(objectID);
+          orphanKeyMetaDataMap.put(parentObjectID, orphanKeyMetaData);
+        }
+      } else {
+        Set<Long> objectIds = new HashSet<>();
+        objectIds.add(objectID);
+        OrphanKeyMetaData orphanKeyMetaData =
+            new OrphanKeyMetaData(objectIds, status);
+        orphanKeyMetaDataMap.put(parentObjectID, orphanKeyMetaData);
+      }
+    }
+  }
+
+  protected boolean verifyOrphanParentsForBucket(
+      Set<Long> bucketObjectIdsSet,
+      List<Long> toBeDeletedBucketObjectIdsFromOrphanMap)
+      throws IOException {
+    try (TableIterator<Long, ? extends Table.KeyValue<Long,

Review Comment:
   add comment, 
   if orphan parentId matches bucket, and bucket exist, then its not orphan (as 
bucket is not present in key/file table as parent) and remove from orphan map



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java:
##########
@@ -133,25 +193,42 @@ protected void handlePutDirEvent(OmDirectoryInfo 
directoryInfo,
     }
     curNSSummary.setDirName(dirName);
     nsSummaryMap.put(objectId, curNSSummary);
-
+    removeFromOrphanIfExists(omDirectoryInfo, orphanKeyMetaDataMap);
     // Write the child dir list to the parent directory
     // Try to get the NSSummary from our local map that maps NSSummaries to IDs
     NSSummary nsSummary = nsSummaryMap.get(parentObjectId);
     if (nsSummary == null) {
       // If we don't have it in this batch we try to get it from the DB
       nsSummary = reconNamespaceSummaryManager.getNSSummary(parentObjectId);
-    }
-    if (nsSummary == null) {
-      // If we don't have it locally and in the DB we create a new instance
-      // as this is a new ID
-      nsSummary = new NSSummary();
+      if (nsSummary == null) {
+        // If we don't have it locally and in the DB we create a new instance
+        // as this is a new ID
+        nsSummary = new NSSummary();
+        addOrphanCandidate(omDirectoryInfo, orphanKeyMetaDataMap,
+            status, false);
+      } else {
+        addOrphanCandidate(omDirectoryInfo, orphanKeyMetaDataMap,
+            status, true);
+      }
     }
     nsSummary.addChildDir(objectId);
     nsSummaryMap.put(parentObjectId, nsSummary);
   }
 
-  protected void handleDeleteKeyEvent(OmKeyInfo keyInfo,
-                                      Map<Long, NSSummary> nsSummaryMap)
+  private <T extends WithParentObjectId> void removeFromOrphanIfExists(
+      T fileDirInfo,
+      Map<Long, OrphanKeyMetaData> orphanKeyMetaDataMap) throws IOException {
+    if (null != orphanKeyMetaDataMap) {

Review Comment:
   need add comment,
   If object as parent has come, then its child are not orphan and can remove 
parent from orphan map



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java:
##########
@@ -86,19 +112,50 @@ protected void writeNSSummariesToDB(Map<Long, NSSummary> 
nsSummaryMap)
     }
   }
 
-  protected void handlePutKeyEvent(OmKeyInfo keyInfo, Map<Long,
-      NSSummary> nsSummaryMap) throws IOException {
+  protected void writeOrphanKeysMetaDataToDB(
+      Map<Long, OrphanKeyMetaData> orphanKeyMetaDataMap, long status)
+      throws IOException {
+    try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) {
+      orphanKeyMetaDataMap.keySet().forEach((Long key) -> {
+        try {
+          OrphanKeyMetaData orphanKeyMetaData =

Review Comment:
   if metadata do not have any child, then we should delete the entry



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java:
##########
@@ -47,15 +57,19 @@ public class NSSummaryTaskWithFSO extends 
NSSummaryTaskDbEventHandler {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(NSSummaryTaskWithFSO.class);
+  private ReconOMMetadataManager reconOMMetadataManager;

Review Comment:
   member can be defined as private



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java:
##########
@@ -225,4 +328,146 @@ protected boolean checkAndCallFlushToDB(
     }
     return true;
   }
+
+  protected boolean writeFlushAndCommitOrphanKeysMetaDataToDB(
+      Map<Long, OrphanKeyMetaData> orphanKeyMetaDataMap, long status) {
+    try {
+      writeOrphanKeysMetaDataToDB(orphanKeyMetaDataMap, status);
+      orphanKeyMetaDataMap.clear();
+    } catch (IOException e) {
+      LOG.error("Unable to write orphan keys meta data in Recon DB.", e);
+      return false;
+    }
+    return true;
+  }
+
+  protected boolean checkOrphanDataAndCallWriteFlushToDB(
+      Map<Long, OrphanKeyMetaData> orphanKeyMetaDataMap, long status) {
+    // if map contains more than entries, flush to DB and clear the map
+    if (null != orphanKeyMetaDataMap && orphanKeyMetaDataMap.size() >=
+        orphanKeysFlushToDBMaxThreshold) {
+      return writeFlushAndCommitOrphanKeysMetaDataToDB(
+          orphanKeyMetaDataMap, status);
+    }
+    return true;
+  }
+
+  protected void deleteOrphanKeysMetaDataFromDB(
+      List<Long> orphanKeysParentIdList) throws IOException {
+    try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) {
+      orphanKeysParentIdList.forEach(parentId -> {
+        try {
+          reconNamespaceSummaryManager.batchDeleteOrphanKeyMetaData(
+              rdbBatchOperation, parentId);
+        } catch (IOException e) {
+          LOG.error(
+              "Unable to delete orphan keys from orphanKeysMetaDataTable " +
+                  "in Recon DB.", e);
+        }
+      });
+      try {
+        reconNamespaceSummaryManager.commitBatchOperation(rdbBatchOperation);
+      } catch (IOException e) {
+        // Logging as Info as we don't want to log as error when any dir not
+        // found in orphan candidate metadata set. This is done to avoid 2
+        // rocks DB operations - check if present and then delete operation.
+        LOG.info("Delete batch unable to delete few entries as dir may not be" 
+
+            " found in orphan candidate metadata set");
+      }
+    }
+  }
+
+  protected boolean batchDeleteAndCommitOrphanKeysMetaDataToDB(
+      List<Long> orphanKeysParentIdList) {
+    try {
+      deleteOrphanKeysMetaDataFromDB(orphanKeysParentIdList);
+      orphanKeysParentIdList.clear();
+    } catch (IOException e) {
+      LOG.error("Unable to delete orphan keys meta data from Recon DB.", e);
+      return false;
+    }
+    return true;
+  }
+
+  protected boolean checkOrphanDataThresholdAndAddToDeleteBatch(
+      List<Long> orphanKeysParentIdList) {
+    // if map contains more than entries, flush to DB and clear the map
+    if (null != orphanKeysParentIdList && orphanKeysParentIdList.size() >=
+        orphanKeysFlushToDBMaxThreshold) {
+      return 
batchDeleteAndCommitOrphanKeysMetaDataToDB(orphanKeysParentIdList);
+    }
+    return true;
+  }
+
+  private <T extends WithParentObjectId> void addOrphanCandidate(
+      T fileDirObjInfo,
+      Map<Long, OrphanKeyMetaData> orphanKeyMetaDataMap,
+      long status,
+      boolean parentExist)
+      throws IOException {
+    if (null != orphanKeyMetaDataMap) {
+      long objectID = fileDirObjInfo.getObjectID();
+      long parentObjectID = fileDirObjInfo.getParentObjectID();
+      if (parentExist) {

Review Comment:
   plz add comment to clarify logic,
   if parent exist in NSSummaryMap, need to check if parent is already an 
orphan or not using orphanKeyMetaData. If already orphan, then add this child 
also in orphan list.
   
   If parent does not exist in NSSummaryMap, this the child can be orphan.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java:
##########
@@ -225,4 +328,146 @@ protected boolean checkAndCallFlushToDB(
     }
     return true;
   }
+
+  protected boolean writeFlushAndCommitOrphanKeysMetaDataToDB(
+      Map<Long, OrphanKeyMetaData> orphanKeyMetaDataMap, long status) {
+    try {
+      writeOrphanKeysMetaDataToDB(orphanKeyMetaDataMap, status);
+      orphanKeyMetaDataMap.clear();
+    } catch (IOException e) {
+      LOG.error("Unable to write orphan keys meta data in Recon DB.", e);
+      return false;
+    }
+    return true;
+  }
+
+  protected boolean checkOrphanDataAndCallWriteFlushToDB(
+      Map<Long, OrphanKeyMetaData> orphanKeyMetaDataMap, long status) {
+    // if map contains more than entries, flush to DB and clear the map
+    if (null != orphanKeyMetaDataMap && orphanKeyMetaDataMap.size() >=
+        orphanKeysFlushToDBMaxThreshold) {
+      return writeFlushAndCommitOrphanKeysMetaDataToDB(
+          orphanKeyMetaDataMap, status);
+    }
+    return true;
+  }
+
+  protected void deleteOrphanKeysMetaDataFromDB(
+      List<Long> orphanKeysParentIdList) throws IOException {
+    try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) {
+      orphanKeysParentIdList.forEach(parentId -> {
+        try {
+          reconNamespaceSummaryManager.batchDeleteOrphanKeyMetaData(
+              rdbBatchOperation, parentId);
+        } catch (IOException e) {
+          LOG.error(
+              "Unable to delete orphan keys from orphanKeysMetaDataTable " +
+                  "in Recon DB.", e);
+        }
+      });
+      try {
+        reconNamespaceSummaryManager.commitBatchOperation(rdbBatchOperation);
+      } catch (IOException e) {
+        // Logging as Info as we don't want to log as error when any dir not
+        // found in orphan candidate metadata set. This is done to avoid 2
+        // rocks DB operations - check if present and then delete operation.
+        LOG.info("Delete batch unable to delete few entries as dir may not be" 
+
+            " found in orphan candidate metadata set");
+      }
+    }
+  }
+
+  protected boolean batchDeleteAndCommitOrphanKeysMetaDataToDB(
+      List<Long> orphanKeysParentIdList) {
+    try {
+      deleteOrphanKeysMetaDataFromDB(orphanKeysParentIdList);
+      orphanKeysParentIdList.clear();
+    } catch (IOException e) {
+      LOG.error("Unable to delete orphan keys meta data from Recon DB.", e);
+      return false;
+    }
+    return true;
+  }
+
+  protected boolean checkOrphanDataThresholdAndAddToDeleteBatch(
+      List<Long> orphanKeysParentIdList) {
+    // if map contains more than entries, flush to DB and clear the map
+    if (null != orphanKeysParentIdList && orphanKeysParentIdList.size() >=
+        orphanKeysFlushToDBMaxThreshold) {
+      return 
batchDeleteAndCommitOrphanKeysMetaDataToDB(orphanKeysParentIdList);
+    }
+    return true;
+  }
+
+  private <T extends WithParentObjectId> void addOrphanCandidate(
+      T fileDirObjInfo,
+      Map<Long, OrphanKeyMetaData> orphanKeyMetaDataMap,
+      long status,
+      boolean parentExist)
+      throws IOException {
+    if (null != orphanKeyMetaDataMap) {
+      long objectID = fileDirObjInfo.getObjectID();
+      long parentObjectID = fileDirObjInfo.getParentObjectID();
+      if (parentExist) {
+        OrphanKeyMetaData orphanKeyMetaData =
+            orphanKeyMetaDataMap.get(parentObjectID);
+        if (null == orphanKeyMetaData) {
+          orphanKeyMetaData =
+              reconNamespaceSummaryManager.getOrphanKeyMetaData(
+                  parentObjectID);
+        }
+        if (null != orphanKeyMetaData) {
+          Set<Long> objectIds = orphanKeyMetaData.getObjectIds();
+          objectIds.add(objectID);
+          orphanKeyMetaDataMap.put(parentObjectID, orphanKeyMetaData);
+        }
+      } else {
+        Set<Long> objectIds = new HashSet<>();
+        objectIds.add(objectID);
+        OrphanKeyMetaData orphanKeyMetaData =
+            new OrphanKeyMetaData(objectIds, status);
+        orphanKeyMetaDataMap.put(parentObjectID, orphanKeyMetaData);
+      }
+    }
+  }
+
+  protected boolean verifyOrphanParentsForBucket(
+      Set<Long> bucketObjectIdsSet,
+      List<Long> toBeDeletedBucketObjectIdsFromOrphanMap)
+      throws IOException {
+    try (TableIterator<Long, ? extends Table.KeyValue<Long,
+        OrphanKeyMetaData>> orphanKeysMetaDataIter =
+             orphanKeysMetaDataTable.iterator()) {
+      while (orphanKeysMetaDataIter.hasNext()) {
+        Table.KeyValue<Long, OrphanKeyMetaData> keyValue =
+            orphanKeysMetaDataIter.next();
+        Long parentId = keyValue.getKey();
+        if (bucketObjectIdsSet.contains(parentId)) {
+          toBeDeletedBucketObjectIdsFromOrphanMap.add(parentId);
+          if (!checkOrphanDataThresholdAndAddToDeleteBatch(
+              toBeDeletedBucketObjectIdsFromOrphanMap)) {
+            return true;

Review Comment:
   this will never return false as IOException is suppressed, just logged.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OrphanKeyDetectionTask.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import com.google.inject.Inject;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
+import org.apache.hadoop.ozone.recon.api.types.OrphanKeyMetaData;
+import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
+import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_DIR_TABLE;
+import static 
org.apache.hadoop.ozone.recon.spi.impl.ReconDBDefinition.ORPHAN_KEYS_METADATA;
+
+/**
+ * Task class to iterate over the OM DB and detect the orphan keys metadata.
+ */
+public class OrphanKeyDetectionTask implements ReconOmTask {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OrphanKeyDetectionTask.class);
+  private DBStore reconDbStore;
+  private ReconNamespaceSummaryManager reconNamespaceSummaryManager;
+  private final Table<Long, OrphanKeyMetaData> orphanKeysMetaDataTable;
+
+  @Inject
+  public OrphanKeyDetectionTask(
+      ReconDBProvider reconDBProvider,
+      ReconNamespaceSummaryManager reconNamespaceSummaryManager)
+      throws IOException {
+    this.reconDbStore = reconDBProvider.getDbStore();
+    this.reconNamespaceSummaryManager = reconNamespaceSummaryManager;
+    this.orphanKeysMetaDataTable =
+        ORPHAN_KEYS_METADATA.getTable(reconDbStore);
+  }
+
+  @Override
+  public String getTaskName() {
+    return "OrphanKeyDetectionTask";
+  }
+
+  public Collection<String> getTaskTables() {
+    List<String> taskTables = new ArrayList<>();
+    taskTables.add(DELETED_DIR_TABLE);

Review Comment:
   This is used for cleanup orphan parent if already exist in deletedDirTable 
for FSO. So based on logic, we need cleanup with below task,
   1. bucket delete event
   Also can help in handling nssummary memory leak where objectId is never 
cleaned for DELETE event of directory and bucket
   - We can rename this class as used for nssummary or can be merged with 
existing NSSummaryTaskDbEvents.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to