xichen01 commented on code in PR #10412:
URL: https://github.com/apache/ozone/pull/10412#discussion_r3425968112


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -401,108 +475,226 @@ public BackgroundTaskResult call() {
     @SuppressWarnings("checkstyle:parameternumber")
     private void evaluateFSOBucket(OmVolumeArgs volume, OmBucketInfo bucket, 
String bucketKey,
         Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList,
-        LimitedExpiredObjectList expiredKeyList, LimitedExpiredObjectList 
expiredDirList) {
+        LimitedExpiredObjectList expiredKeyList, LimitedExpiredObjectList 
expiredDirList,
+        OmLifecycleScanState.Builder scanStateBuilder) {
       List<OmLCRule> prefixRuleList =
           ruleList.stream().filter(r -> 
r.isPrefixEnable()).collect(Collectors.toList());
       // r.isPrefixEnable() == false means empty filter
       List<OmLCRule> noPrefixRuleList =
           ruleList.stream().filter(r -> 
!r.isPrefixEnable()).collect(Collectors.toList());
 
-      for (OmLCRule rule : prefixRuleList) {
-        // find KeyInfo of each directory for prefix
-        List<OmDirectoryInfo> dirList;
+      if (!noPrefixRuleList.isEmpty()) {
+        // evaluate all rules against each key
+        prefixRuleList.addAll(noPrefixRuleList);
+        evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "", 
null, "",
+            prefixRuleList, expiredKeyList, expiredDirList, scanStateBuilder);
+        return;
+      }
+
+      List<RuleListWithDirectoryList> unionPrefixRuleList =
+          getRuleUnion(volume.getObjectID(), bucket, prefixRuleList, 
bucketKey);
+
+      if (unionPrefixRuleList != null) {
+        if (unionPrefixRuleList.isEmpty()) {
+          // fallback to evaluate the whole bucket
+          evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "", 
null, "",
+              prefixRuleList, expiredKeyList, expiredDirList, 
scanStateBuilder);
+        } else {
+          for (RuleListWithDirectoryList ruleWithDirList : 
unionPrefixRuleList) {
+            List<OmLCRule> rules = ruleWithDirList.getRuleList();
+            DirectoryList dir = ruleWithDirList.getDirList();
+            evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, 
dir.getLastSubDirPath(),
+                dir.getLastSubDir(), dir.getLastSubDirKey(),
+                rules, expiredKeyList, expiredDirList, scanStateBuilder);
+          }
+        }
+      }
+    }
+
+    /**
+     * Finds the directory union list from a list of prefixes and sorts them
+     * according to the FSO depth-first iteration order.
+     */
+    private List<RuleListWithDirectoryList> getRuleUnion(long volumeId, 
OmBucketInfo bucket,
+        List<OmLCRule> rules, String bucketKey) {
+
+      if (rules.isEmpty() || rules.stream().anyMatch(
+          r -> r.getEffectivePrefix() == null || 
r.getEffectivePrefix().isEmpty())) {
+        // The union of anything with the root is just the root itself.
+        return new ArrayList();
+      }
+
+      List<RuleListWithDirectoryList> effectiveRuleList = new ArrayList<>();
+      for (OmLCRule rule : rules) {
+        String prefix = rule.getEffectivePrefix();
+        // Resolve each prefix to actual FSO directories in the DB
         try {
-          dirList = getDirList(volume, bucket, rule.getEffectivePrefix(), 
bucketKey);
+          if (!prefix.endsWith(OzoneConsts.OM_KEY_PREFIX)) {
+            // FSO bucket doesn't allow prefix without tailing '/'
+            // Prefix ends with a slash, it explicitly refers to a directory 
(e.g. "log/")
+            LOG.warn("Skip rule {} since FILE_SYSTEM_OPTIMIZED bucket prefix 
must end with '/'", rule);
+            continue;
+          }
+
+          // Normalize by removing the trailing slash for uniform comparison
+          String normalizedPrefix = prefix.substring(0, prefix.length() - 1);
+          DirectoryList dirList = getDirList(volumeId, bucket, 
normalizedPrefix, bucketKey);
+          // If the prefix is log/, and "log" dir really exists, then the 
matched dir is "log".
+          // Otherwise, this rule doesn't match any dir/file in this FSO 
bucket, this rule can be skipped.
+          if (!dirList.isEmpty() && dirList.isAllResolvedPrefix()) {
+            RuleListWithDirectoryList ruleListWithDirectoryList = new 
RuleListWithDirectoryList(
+                Collections.singletonList(rule), dirList, prefix);
+            effectiveRuleList.add(ruleListWithDirectoryList);
+          }
         } catch (IOException e) {
-          LOG.warn("Skip rule {} as its prefix doesn't have all directory 
exist", rule);
-          // skip this rule if some directory doesn't exist for this rule's 
prefix
+          // Directory doesn't exist or IO error, skip this rule
+          LOG.warn("Skip to evaluate rule {} due to failed to resolve prefix 
{} for bucket {}",
+              rule, prefix, bucketKey, e);
+        }
+      }
+
+      if (effectiveRuleList.isEmpty()) {
+        // there is no valid rule found, either prefix doesn't end with "/",
+        // or any directory along the prefix cannot be found.
+        LOG.warn("Prefix of all rules of bucket {} cannot be resolved to an 
existing directory. ", bucketKey);
+        return null;
+      }
+
+      if (effectiveRuleList.size() == 1) {
+        return effectiveRuleList;
+      }
+
+      // Find if one rule's prefix is the sub string of another rule's prefix.
+      // e.g.
+      // dir1/dir2/, dir1/dir2/dir3/, dir1/ -> dir1/
+      // dir1/dir2/, dir1/dir3/, dir1/dir4/ -> dir1/dir2/, dir1/dir3/, 
dir1/dir4dir1/
+      // dir1/dir2/dir3/, dir1/dir2/, dir2/ -> dir1/dir2/, dir2/
+      // dir1/dir2/, dir1/dir3/, dir1/ -> dir1/
+      List<RuleListWithDirectoryList> consolidatedRules = new ArrayList<>();
+      Set<OmLCRule> skipEvaluatedRuleList = new HashSet<>();
+      for (int i = 0; i < effectiveRuleList.size(); i++) {
+        OmLCRule rule = effectiveRuleList.get(i).getRuleList().get(0);
+        if (skipEvaluatedRuleList.contains(rule)) {
           continue;
         }
-        StringBuffer lastDirPath = new StringBuffer();
-        OmDirectoryInfo lastDir = null;
-        if (!dirList.isEmpty()) {
-          lastDir = dirList.get(dirList.size() - 1);
-          for (int i = 0; i < dirList.size(); i++) {
-            lastDirPath.append(dirList.get(i).getName());
-            if (i != dirList.size() - 1) {
-              lastDirPath.append(OM_KEY_PREFIX);
-            }
-          }
-          if (lastDirPath.toString().startsWith(TRASH_PREFIX)) {
-            LOG.info("Skip evaluate trash directory {}", lastDirPath);
-          } else {
-            evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, 
lastDirPath.toString(), lastDir,
-                Arrays.asList(rule), expiredKeyList, expiredDirList);
+
+        RuleListWithDirectoryList consolidatedCandidate = new 
RuleListWithDirectoryList();
+        String consolidatedPrefix = 
effectiveRuleList.get(i).getConsolidatedPrefix();
+        String finalRuleIndexID = rule.getId();
+        DirectoryList finalDirList = effectiveRuleList.get(i).getDirList();
+        for (int j = i + 1; j < effectiveRuleList.size(); j++) {
+          OmLCRule otherRule = effectiveRuleList.get(j).getRuleList().get(0);
+          if (skipEvaluatedRuleList.contains(otherRule)) {
+            continue;
           }
 
-          if (!rule.getEffectivePrefix().endsWith(OM_KEY_PREFIX)) {
-            // if the prefix doesn't end with "/", then also search and 
evaluate the directory itself
-            // for example, "dir1/dir2" matches both directory "dir1/dir2" and 
"dir1/dir22"
-            // or "dir1" matches both directory "dir1" and "dir11"
-            long objID;
-            String objPrefix;
-            String objPath;
-            if (dirList.size() > 1) {
-              OmDirectoryInfo secondLastDir = dirList.get(dirList.size() - 2);
-              objID = secondLastDir.getObjectID();
-              objPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX 
+ bucket.getObjectID() +
-                  OM_KEY_PREFIX + secondLastDir.getObjectID();
-              StringBuffer secondLastDirPath = new StringBuffer();
-              for (int i = 0; i < dirList.size() - 1; i++) {
-                secondLastDirPath.append(dirList.get(i).getName());
-                if (i != dirList.size() - 2) {
-                  secondLastDirPath.append(OM_KEY_PREFIX);
-                }
-              }
-              objPath = secondLastDirPath.toString();
-            } else {
-              objID = bucket.getObjectID();
-              objPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX 
+ bucket.getObjectID() +
-                  OM_KEY_PREFIX + bucket.getObjectID();
-              objPath = "";
-            }
-            try {
-              SubDirectorySummary subDirSummary = getSubDirectory(objID, 
objPrefix, omMetadataManager);
-              for (OmDirectoryInfo subDir : subDirSummary.getSubDirList()) {
-                String subDirPath = objPath.isEmpty() ? subDir.getName() : 
objPath + OM_KEY_PREFIX + subDir.getName();
-                if (!subDir.getName().equals(TRASH_PREFIX) && 
subDirPath.startsWith(rule.getEffectivePrefix()) &&
-                    (lastDir == null || subDir.getObjectID() != 
lastDir.getObjectID())) {
-                  evaluateKeyAndDirTable(bucket, volume.getObjectID(), 
keyTable, subDirPath, subDir,
-                      Arrays.asList(rule), expiredKeyList, expiredDirList);
-                }
-              }
-            } catch (IOException e) {
-              // log failure and continue the process
-              LOG.warn("Failed to get sub directories of {} under {}/{}", 
objPrefix,
-                  bucket.getVolumeName(), bucket.getBucketName(), e);
-              return;
-            }
+          DirectoryList otherDirList = effectiveRuleList.get(j).getDirList();
+          String otherPrefix = otherRule.getEffectivePrefix();
+          if (otherPrefix.startsWith(consolidatedPrefix)) {
+            LOG.info("Rule {}'s prefix {} is sub string of rule {}'s prefix 
{}. " +
+                    " Consolidate {} into {}.", otherRule.getId(), 
otherPrefix, finalRuleIndexID,
+                consolidatedPrefix, otherRule.getId(), finalRuleIndexID);
+            consolidatedCandidate.addRule(otherRule);
+            skipEvaluatedRuleList.add(otherRule);
+          } else if (consolidatedPrefix.startsWith(otherPrefix)) {
+            LOG.info("Rule {}'s prefix {} is sub string of rule {}'s prefix 
{}. Consolidate {} int {}. ",
+                consolidatedPrefix, consolidatedPrefix, otherRule.getId(), 
otherPrefix, consolidatedPrefix,
+                otherRule.getId());
+            consolidatedPrefix = otherPrefix;
+            finalRuleIndexID = otherRule.getId();
+            finalDirList = otherDirList;
+            consolidatedCandidate.addRule(otherRule);
+            skipEvaluatedRuleList.add(otherRule);
           }
-        } else {
-          evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "", 
null,
-              Arrays.asList(rule), expiredKeyList, expiredDirList);
         }
+
+        consolidatedCandidate.addRule(rule);
+        consolidatedCandidate.setDirList(finalDirList);
+        consolidatedCandidate.setConsolidatedPrefix(consolidatedPrefix);
+        consolidatedRules.add(consolidatedCandidate);
       }
 
-      if (!noPrefixRuleList.isEmpty()) {
-        evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "", 
null,
-            noPrefixRuleList, expiredKeyList, expiredDirList);
+      // Sort the list of paths lexicographically.
+      // FSO Depth-First Search order evaluates directories in lexicographical 
order
+      // (since it retrieves entries from RocksDB sorted by name within the 
same parent).
+      // Standard string sort on logical paths separated by "/" perfectly 
matches this DFS order.
+      List<RuleListWithDirectoryList> sortedConsolidatedRules =
+          consolidatedRules.stream().sorted(new 
RuleListWithDirectoryListOrder()).collect(Collectors.toList());
+
+      LOG.info("Final consolidated rules: " +
+          
sortedConsolidatedRules.stream().map(RuleListWithDirectoryList::toString).collect(Collectors.joining(",
 ")));
+      if (test) {
+        consolidatedRuleList = sortedConsolidatedRules;
       }
+      return sortedConsolidatedRules;
+    }
+
+    private boolean canSkipDir(OmDirectoryInfo currentDir, String 
currentDirTableKey, DirectoryList dirList) {

Review Comment:
   Rescanning isn't a frequent operation, and the directory structure may 
change, so exact matching isn't necessary.
   This simplifies the logic considerably:
   
   -  Record only the directory names: `lastScannedDir = “dir3/dir6/dir8”`
   - Comparison Based on Directory Names
   -  Starting from the root and comparing segment by segment, if one side is a 
prefix of the other (ancestor/descendant relationship), do not skip; otherwise, 
skip.
   such as:
   ```java
   private boolean canSkip(String currentDirPath, String lastScannedDir) {
     if (lastScannedDir == null || currentDirPath.isEmpty()) {
       return false;
     }
     String[] cur = currentDirPath.split(OM_KEY_PREFIX);
     String[] last = lastScannedDir.split(OM_KEY_PREFIX);
     int n = Math.min(cur.length, last.length);
     for (int i = 0; i < n; i++) {
       int cmp = cur[i].compareTo(last[i]);
       if (cmp != 0) {
         return cmp > 0;        // first branch:current name => last name -> 
skip
       }
     }
     return false;
   }
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -154,6 +165,22 @@ public KeyLifecycleService(OzoneManager ozoneManager,
         OZONE_KEY_LIFECYCLE_SERVICE_ENABLED_DEFAULT));
     this.moveToTrashEnabled = new 
AtomicBoolean(conf.getBoolean(OZONE_KEY_LIFECYCLE_SERVICE_MOVE_TO_TRASH_ENABLED,
         OZONE_KEY_LIFECYCLE_SERVICE_MOVE_TO_TRASH_ENABLED_DEFAULT));
+    this.stateSaveIntervalMs = 
conf.getLong(OZONE_KEY_LIFECYCLE_SERVICE_STATE_SAVE_INTERVAL_MS,
+        OZONE_KEY_LIFECYCLE_SERVICE_STATE_SAVE_INTERVAL_MS_DEFAULT);
+    if (!test && stateSaveIntervalMs <= 0) {
+      LOG.warn("Illegal value {} for Property {}. Set {} to {}", 
stateSaveIntervalMs,
+          OZONE_KEY_LIFECYCLE_SERVICE_STATE_SAVE_INTERVAL_MS, 
OZONE_KEY_LIFECYCLE_SERVICE_STATE_SAVE_INTERVAL_MS,
+          OZONE_KEY_LIFECYCLE_SERVICE_STATE_SAVE_INTERVAL_MS_DEFAULT);
+      maxKeysProcessedPerState = 
OZONE_KEY_LIFECYCLE_SERVICE_STATE_SAVE_INTERVAL_MS_DEFAULT;

Review Comment:
   Should be `stateSaveIntervalMs`
   



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -721,53 +1025,106 @@ private SubDirectorySummary getSubDirectory(long 
dirObjID, String prefix, OMMeta
             continue;
           }
           if (dir.getParentObjectID() == dirObjID) {
-            subDirList.addSubDir(dir);
+            subDirList.addSubDir(entry.getKey(), dir, dir.getName());
           }
         }
       }
       return subDirList;
     }
 
+    private void flushAndSaveState(OmBucketInfo bucket, 
LimitedExpiredObjectList expiredKeyList, 
+        LimitedExpiredObjectList expiredDirList, OmLifecycleScanState.Builder 
scanStateBuilder) {
+      boolean saved = false;
+      if (expiredKeyList != null && !expiredKeyList.isEmpty()) {
+        if (bucket.getBucketLayout() == OBJECT_STORE) {
+          sendDeleteKeysRequestAndClearList(bucket.getVolumeName(), 
bucket.getBucketName(), expiredKeyList,
+              false, scanStateBuilder, false);
+        } else {
+          handleAndClearFullList(bucket, expiredKeyList, false, 
scanStateBuilder, false);
+        }
+        saved = true;
+      }
+      if (expiredDirList != null && !expiredDirList.isEmpty()) {
+        if (bucket.getBucketLayout() != OBJECT_STORE) {
+          handleAndClearFullList(bucket, expiredDirList, true, 
scanStateBuilder, false);
+          saved = true;
+        }
+      }
+      if (!saved) {
+        sendSaveScanStateRequest(scanStateBuilder, false);
+      }
+      lastStateSaveTime = Time.monotonicNow();
+      lastStateSaveKeyCount = numKeyIterated;
+    }
+
     private void evaluateBucket(OmBucketInfo bucketInfo,
-        Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList, 
LimitedExpiredObjectList expiredKeyList) {
+        Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList, 
LimitedExpiredObjectList expiredKeyList,
+        OmLifecycleScanState.Builder scanStateBuilder) {
       String volumeName = bucketInfo.getVolumeName();
       String bucketName = bucketInfo.getBucketName();
+      String bucketPrefix = omMetadataManager.getBucketKey(volumeName, 
bucketName);
 
-      // use bucket name as key iterator prefix
       try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyTblItr =
-               keyTable.iterator(omMetadataManager.getBucketKey(volumeName, 
bucketName))) {

Review Comment:
   Is it possible to directly use 
`keyTblItr.seek(scanStateBuilder.getLastScannedKey())` and then process it 
according to the original logic?
   
   A certain degree of repetition is acceptable, as the keyTable itself may 
change too.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponse.java:
##########
@@ -35,26 +36,29 @@
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmLifecycleScanState;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 
 /**
  * Response for DeleteKey request.
  */
-@CleanupTableInfo(cleanupTables = {KEY_TABLE, OPEN_KEY_TABLE, DELETED_TABLE, 
BUCKET_TABLE})
+@CleanupTableInfo(cleanupTables = {KEY_TABLE, OPEN_KEY_TABLE, DELETED_TABLE, 
BUCKET_TABLE, LIFECYCLE_SCAN_STATE_TABLE})
 public class OMKeysDeleteResponse extends AbstractOMKeyDeleteResponse {
   private List<OmKeyInfo> omKeyInfoList;
   private OmBucketInfo omBucketInfo;
   private Map<String, OmKeyInfo> openKeyInfoMap = new HashMap<>();
+  private OmLifecycleScanState scanState;

Review Comment:
   Duplicate with parent scanState
   



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java:
##########
@@ -162,6 +164,17 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
       String volumeOwner = getVolumeOwner(omMetadataManager, volumeName);
 
+      if (sourceType == RequestSource.LIFECYCLE && 
deleteKeyRequest.hasScanState()) {
+        if (ozoneManager.getAclsEnabled()) {

Review Comment:
   we should check ACL in preExecute
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to