This is an automated email from the ASF dual-hosted git repository.

smengcl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c13a0c0fcb4 HDDS-15166. Increased cleanup interval and purged partial 
reports during cleanup. (#10250)
c13a0c0fcb4 is described below

commit c13a0c0fcb478266aec94f5aa9eb8df7265abad6
Author: SaketaChalamchala <[email protected]>
AuthorDate: Fri May 22 01:38:48 2026 -0700

    HDDS-15166. Increased cleanup interval and purged partial reports during 
cleanup. (#10250)
---
 .../common/src/main/resources/ozone-default.xml    |  2 +-
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  2 +-
 .../om/service/SnapshotDiffCleanupService.java     | 33 ++++++++-----
 .../om/service/TestSnapshotDiffCleanupService.java | 57 ++++++++++++++++------
 4 files changed, 65 insertions(+), 29 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index b681e3bdd2e..c85d877e608 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -4777,7 +4777,7 @@
 
   <property>
     <name>ozone.om.snapshot.diff.cleanup.service.run.interval</name>
-    <value>1m</value>
+    <value>60m</value>
     <tag>OZONE, OM</tag>
     <description>
       Interval at which snapshot diff clean up service will run.
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index a773db9ccad..a43d3fb2598 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -621,7 +621,7 @@ public final class OMConfigKeys {
       = "ozone.om.snapshot.cache.cleanup.service.run.interval";
   public static final long
       OZONE_OM_SNAPSHOT_DIFF_CLEANUP_SERVICE_RUN_INTERVAL_DEFAULT
-      = TimeUnit.MINUTES.toMillis(1);
+      = TimeUnit.MINUTES.toMillis(60);
   public static final long
       OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL_DEFAULT
       = TimeUnit.MINUTES.toMillis(1);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
index d4d759a4e4e..9aaeafa1c96 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
@@ -45,12 +45,17 @@
 import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Background service to clean-up snapDiff jobs which are stable and
  * corresponding reports.
  */
 public class SnapshotDiffCleanupService extends BackgroundService {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SnapshotDiffCleanupService.class);
+
   // Use only a single thread for Snapshot Diff cleanup.
   // Multiple threads would read from the same table and can send deletion
   // requests for same snapshot diff job multiple times.
@@ -118,8 +123,11 @@ public void run() {
     // In clean report table first and them move jobs to purge table approach,
     // assumption is that by the next cleanup run, there is no purged snapDiff
     // job reading from report table.
-    removeOlderJobReport();
-    moveOldSnapDiffJobsToPurgeTable();
+    long purgedReportJobs = removeOlderJobReport();
+    long movedJobsToPurgeTable = moveOldSnapDiffJobsToPurgeTable();
+    LOG.info("Snapshot diff cleanup run completed. Purged report jobs: {}, " +
+            "moved jobs to purge table: {}.",
+        purgedReportJobs, movedJobsToPurgeTable);
   }
 
   @VisibleForTesting
@@ -144,7 +152,7 @@ public byte[] getEntryFromPurgedJobTable(String jobId) {
    * than the {@link SnapshotDiffCleanupService#maxAllowedTime}.
    * `maxAllowedTime` is the time, a snapDiff job and its report is persisted.
    */
-  private void moveOldSnapDiffJobsToPurgeTable() {
+  private long moveOldSnapDiffJobsToPurgeTable() {
     try (ManagedRocksIterator iterator =
              new ManagedRocksIterator(db.get().newIterator(snapDiffJobCfh));
          ManagedWriteBatch writeBatch = new ManagedWriteBatch();
@@ -175,35 +183,34 @@ private void moveOldSnapDiffJobsToPurgeTable() {
       }
 
       db.get().write(writeOptions, writeBatch);
+      return purgeJobCount;
     } catch (IOException | RocksDBException e) {
       // TODO: [SNAPSHOT] Fail gracefully.
       throw new RuntimeException(e);
     }
   }
 
-  private void removeOlderJobReport() {
+  private long removeOlderJobReport() {
     try (ManagedRocksIterator rocksIterator = new ManagedRocksIterator(
         db.get().newIterator(snapDiffPurgedJobCfh));
          ManagedWriteBatch writeBatch = new ManagedWriteBatch();
          ManagedWriteOptions writeOptions = new ManagedWriteOptions()) {
+      long purgedReportJobs = 0;
       rocksIterator.get().seekToFirst();
       while (rocksIterator.get().isValid()) {
         byte[] key = rocksIterator.get().key();
-        byte[] value = rocksIterator.get().value();
         rocksIterator.get().next();
         String prefix = codecRegistry.asObject(key, String.class);
-        long totalNumberOfEntries = codecRegistry.asObject(value, Long.class);
-
-        if (totalNumberOfEntries > 0) {
-          byte[] beginKey = codecRegistry.asRawData(prefix + DELIMITER + 0);
-          byte[] endKey = 
codecRegistry.asRawData(StringUtils.getLexicographicallyHigherString(prefix + 
DELIMITER));
-          // Delete Range excludes the endKey.
-          writeBatch.deleteRange(snapDiffReportCfh, beginKey, endKey);
-        }
+        byte[] beginKey = codecRegistry.asRawData(prefix + DELIMITER + 0);
+        byte[] endKey = 
codecRegistry.asRawData(StringUtils.getLexicographicallyHigherString(prefix + 
DELIMITER));
+        // Delete Range excludes the endKey.
+        writeBatch.deleteRange(snapDiffReportCfh, beginKey, endKey);
         // Finally, remove the entry from the purged job table.
         writeBatch.delete(snapDiffPurgedJobCfh, key);
+        purgedReportJobs++;
       }
       db.get().write(writeOptions, writeBatch);
+      return purgedReportJobs;
     } catch (IOException | RocksDBException e) {
       // TODO: [SNAPSHOT] Fail gracefully.
       throw new RuntimeException(e);
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
index 25947fae645..03b00eb516e 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
@@ -82,9 +82,6 @@ public class TestSnapshotDiffCleanupService {
       StringUtils.string2Bytes("snap-diff-purged-job-table");
   private final byte[] reportTableNameBytes =
       StringUtils.string2Bytes("snap-diff-report-table");
-  private ColumnFamilyDescriptor jobTableCfd;
-  private ColumnFamilyDescriptor purgedJobTableCfd;
-  private ColumnFamilyDescriptor reportTableCfd;
   private ColumnFamilyHandle jobTableCfh;
   private ColumnFamilyHandle purgedJobTableCfh;
   private ColumnFamilyHandle reportTableCfh;
@@ -154,11 +151,11 @@ public void init() throws RocksDBException, IOException {
 
     when(ozoneManager.getConfiguration()).thenReturn(config);
 
-    jobTableCfd = new ColumnFamilyDescriptor(jobTableNameBytes,
+    ColumnFamilyDescriptor jobTableCfd = new 
ColumnFamilyDescriptor(jobTableNameBytes,
         columnFamilyOptions);
-    reportTableCfd = new ColumnFamilyDescriptor(reportTableNameBytes,
+    ColumnFamilyDescriptor reportTableCfd = new 
ColumnFamilyDescriptor(reportTableNameBytes,
         columnFamilyOptions);
-    purgedJobTableCfd = new ColumnFamilyDescriptor(purgedJobTableNameBytes,
+    ColumnFamilyDescriptor purgedJobTableCfd = new 
ColumnFamilyDescriptor(purgedJobTableNameBytes,
         columnFamilyOptions);
     jobTableCfh = db.get().createColumnFamily(jobTableCfd);
     purgedJobTableCfh = db.get().createColumnFamily(purgedJobTableCfd);
@@ -191,22 +188,24 @@ public void tearDown() {
       diffCleanupService.shutdown();
     }
     if (jobTableCfh != null) {
+      dropColumnFamily(jobTableCfh);
       jobTableCfh.close();
     }
     if (purgedJobTableCfh != null) {
+      dropColumnFamily(purgedJobTableCfh);
       purgedJobTableCfh.close();
     }
     if (reportTableCfh != null) {
+      dropColumnFamily(reportTableCfh);
       reportTableCfh.close();
     }
-    if (jobTableCfd != null) {
-      ManagedColumnFamilyOptions.closeDeeply(jobTableCfd.getOptions());
-    }
-    if (purgedJobTableCfd != null) {
-      ManagedColumnFamilyOptions.closeDeeply(purgedJobTableCfd.getOptions());
-    }
-    if (reportTableCfd != null) {
-      ManagedColumnFamilyOptions.closeDeeply(reportTableCfd.getOptions());
+  }
+
+  private void dropColumnFamily(ColumnFamilyHandle columnFamilyHandle) {
+    try {
+      db.get().dropColumnFamily(columnFamilyHandle);
+    } catch (RocksDBException exception) {
+      throw new RuntimeException("Failed to drop column family.", exception);
     }
   }
 
@@ -283,6 +282,27 @@ public void testSnapshotDiffCleanUpService()
     assertNumberOfEntriesInTable(reportTableCfh, 19);
   }
 
+  @Test
+  public void testCleanupRemovesReportEntriesForZeroEntryPurgedJob()
+      throws RocksDBException, IOException {
+    diffCleanupService.suspend();
+
+    long currentTime = System.currentTimeMillis() - 1;
+    SnapshotDiffJob failedJob = addJobAndReport(FAILED, currentTime, 0);
+    addReportEntries(failedJob.getJobId(), 2);
+
+    diffCleanupService.resume();
+
+    diffCleanupService.run();
+    assertJobInPurgedTable(failedJob.getJobId(),
+        failedJob.getTotalDiffEntries());
+    assertReport(failedJob.getJobId(), 2, emptyReportEntry);
+
+    diffCleanupService.run();
+    assertNumberOfEntriesInTable(purgedJobTableCfh, 0);
+    assertReport(failedJob.getJobId(), 2, null);
+  }
+
   private SnapshotDiffJob addJobAndReport(JobStatus jobStatus,
                                           long creationTime,
                                           long noOfEntries)
@@ -315,6 +335,15 @@ private SnapshotDiffJob addJobAndReport(JobStatus 
jobStatus,
     return job;
   }
 
+  private void addReportEntries(String jobId, int noOfEntries)
+      throws IOException, RocksDBException {
+    for (int i = 0; i < noOfEntries; i++) {
+      db.get().put(reportTableCfh,
+          codecRegistry.asRawData(jobId + DELIMITER + i),
+          emptyReportEntry);
+    }
+  }
+
   private void assertJobAndReport(SnapshotDiffJob expectedJob,
                                   boolean isExpected)
       throws IOException, RocksDBException {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to