This is an automated email from the ASF dual-hosted git repository.
smengcl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c13a0c0fcb4 HDDS-15166. Increased cleanup interval and purged partial
reports during cleanup. (#10250)
c13a0c0fcb4 is described below
commit c13a0c0fcb478266aec94f5aa9eb8df7265abad6
Author: SaketaChalamchala <[email protected]>
AuthorDate: Fri May 22 01:38:48 2026 -0700
HDDS-15166. Increased cleanup interval and purged partial reports during
cleanup. (#10250)
---
.../common/src/main/resources/ozone-default.xml | 2 +-
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 2 +-
.../om/service/SnapshotDiffCleanupService.java | 33 ++++++++-----
.../om/service/TestSnapshotDiffCleanupService.java | 57 ++++++++++++++++------
4 files changed, 65 insertions(+), 29 deletions(-)
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index b681e3bdd2e..c85d877e608 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -4777,7 +4777,7 @@
<property>
<name>ozone.om.snapshot.diff.cleanup.service.run.interval</name>
- <value>1m</value>
+ <value>60m</value>
<tag>OZONE, OM</tag>
<description>
Interval at which snapshot diff clean up service will run.
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index a773db9ccad..a43d3fb2598 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -621,7 +621,7 @@ public final class OMConfigKeys {
= "ozone.om.snapshot.cache.cleanup.service.run.interval";
public static final long
OZONE_OM_SNAPSHOT_DIFF_CLEANUP_SERVICE_RUN_INTERVAL_DEFAULT
- = TimeUnit.MINUTES.toMillis(1);
+ = TimeUnit.MINUTES.toMillis(60);
public static final long
OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL_DEFAULT
= TimeUnit.MINUTES.toMillis(1);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
index d4d759a4e4e..9aaeafa1c96 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
@@ -45,12 +45,17 @@
import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Background service to clean-up snapDiff jobs which are stable and
* corresponding reports.
*/
public class SnapshotDiffCleanupService extends BackgroundService {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SnapshotDiffCleanupService.class);
+
// Use only a single thread for Snapshot Diff cleanup.
// Multiple threads would read from the same table and can send deletion
// requests for same snapshot diff job multiple times.
@@ -118,8 +123,11 @@ public void run() {
// In clean report table first and them move jobs to purge table approach,
// assumption is that by the next cleanup run, there is no purged snapDiff
// job reading from report table.
- removeOlderJobReport();
- moveOldSnapDiffJobsToPurgeTable();
+ long purgedReportJobs = removeOlderJobReport();
+ long movedJobsToPurgeTable = moveOldSnapDiffJobsToPurgeTable();
+ LOG.info("Snapshot diff cleanup run completed. Purged report jobs: {}, " +
+ "moved jobs to purge table: {}.",
+ purgedReportJobs, movedJobsToPurgeTable);
}
@VisibleForTesting
@@ -144,7 +152,7 @@ public byte[] getEntryFromPurgedJobTable(String jobId) {
* than the {@link SnapshotDiffCleanupService#maxAllowedTime}.
* `maxAllowedTime` is the time, a snapDiff job and its report is persisted.
*/
- private void moveOldSnapDiffJobsToPurgeTable() {
+ private long moveOldSnapDiffJobsToPurgeTable() {
try (ManagedRocksIterator iterator =
new ManagedRocksIterator(db.get().newIterator(snapDiffJobCfh));
ManagedWriteBatch writeBatch = new ManagedWriteBatch();
@@ -175,35 +183,34 @@ private void moveOldSnapDiffJobsToPurgeTable() {
}
db.get().write(writeOptions, writeBatch);
+ return purgeJobCount;
} catch (IOException | RocksDBException e) {
// TODO: [SNAPSHOT] Fail gracefully.
throw new RuntimeException(e);
}
}
- private void removeOlderJobReport() {
+ private long removeOlderJobReport() {
try (ManagedRocksIterator rocksIterator = new ManagedRocksIterator(
db.get().newIterator(snapDiffPurgedJobCfh));
ManagedWriteBatch writeBatch = new ManagedWriteBatch();
ManagedWriteOptions writeOptions = new ManagedWriteOptions()) {
+ long purgedReportJobs = 0;
rocksIterator.get().seekToFirst();
while (rocksIterator.get().isValid()) {
byte[] key = rocksIterator.get().key();
- byte[] value = rocksIterator.get().value();
rocksIterator.get().next();
String prefix = codecRegistry.asObject(key, String.class);
- long totalNumberOfEntries = codecRegistry.asObject(value, Long.class);
-
- if (totalNumberOfEntries > 0) {
- byte[] beginKey = codecRegistry.asRawData(prefix + DELIMITER + 0);
- byte[] endKey =
codecRegistry.asRawData(StringUtils.getLexicographicallyHigherString(prefix +
DELIMITER));
- // Delete Range excludes the endKey.
- writeBatch.deleteRange(snapDiffReportCfh, beginKey, endKey);
- }
+ byte[] beginKey = codecRegistry.asRawData(prefix + DELIMITER + 0);
+ byte[] endKey =
codecRegistry.asRawData(StringUtils.getLexicographicallyHigherString(prefix +
DELIMITER));
+ // Delete Range excludes the endKey.
+ writeBatch.deleteRange(snapDiffReportCfh, beginKey, endKey);
// Finally, remove the entry from the purged job table.
writeBatch.delete(snapDiffPurgedJobCfh, key);
+ purgedReportJobs++;
}
db.get().write(writeOptions, writeBatch);
+ return purgedReportJobs;
} catch (IOException | RocksDBException e) {
// TODO: [SNAPSHOT] Fail gracefully.
throw new RuntimeException(e);
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
index 25947fae645..03b00eb516e 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
@@ -82,9 +82,6 @@ public class TestSnapshotDiffCleanupService {
StringUtils.string2Bytes("snap-diff-purged-job-table");
private final byte[] reportTableNameBytes =
StringUtils.string2Bytes("snap-diff-report-table");
- private ColumnFamilyDescriptor jobTableCfd;
- private ColumnFamilyDescriptor purgedJobTableCfd;
- private ColumnFamilyDescriptor reportTableCfd;
private ColumnFamilyHandle jobTableCfh;
private ColumnFamilyHandle purgedJobTableCfh;
private ColumnFamilyHandle reportTableCfh;
@@ -154,11 +151,11 @@ public void init() throws RocksDBException, IOException {
when(ozoneManager.getConfiguration()).thenReturn(config);
- jobTableCfd = new ColumnFamilyDescriptor(jobTableNameBytes,
+ ColumnFamilyDescriptor jobTableCfd = new
ColumnFamilyDescriptor(jobTableNameBytes,
columnFamilyOptions);
- reportTableCfd = new ColumnFamilyDescriptor(reportTableNameBytes,
+ ColumnFamilyDescriptor reportTableCfd = new
ColumnFamilyDescriptor(reportTableNameBytes,
columnFamilyOptions);
- purgedJobTableCfd = new ColumnFamilyDescriptor(purgedJobTableNameBytes,
+ ColumnFamilyDescriptor purgedJobTableCfd = new
ColumnFamilyDescriptor(purgedJobTableNameBytes,
columnFamilyOptions);
jobTableCfh = db.get().createColumnFamily(jobTableCfd);
purgedJobTableCfh = db.get().createColumnFamily(purgedJobTableCfd);
@@ -191,22 +188,24 @@ public void tearDown() {
diffCleanupService.shutdown();
}
if (jobTableCfh != null) {
+ dropColumnFamily(jobTableCfh);
jobTableCfh.close();
}
if (purgedJobTableCfh != null) {
+ dropColumnFamily(purgedJobTableCfh);
purgedJobTableCfh.close();
}
if (reportTableCfh != null) {
+ dropColumnFamily(reportTableCfh);
reportTableCfh.close();
}
- if (jobTableCfd != null) {
- ManagedColumnFamilyOptions.closeDeeply(jobTableCfd.getOptions());
- }
- if (purgedJobTableCfd != null) {
- ManagedColumnFamilyOptions.closeDeeply(purgedJobTableCfd.getOptions());
- }
- if (reportTableCfd != null) {
- ManagedColumnFamilyOptions.closeDeeply(reportTableCfd.getOptions());
+ }
+
+ private void dropColumnFamily(ColumnFamilyHandle columnFamilyHandle) {
+ try {
+ db.get().dropColumnFamily(columnFamilyHandle);
+ } catch (RocksDBException exception) {
+ throw new RuntimeException("Failed to drop column family.", exception);
}
}
@@ -283,6 +282,27 @@ public void testSnapshotDiffCleanUpService()
assertNumberOfEntriesInTable(reportTableCfh, 19);
}
+ @Test
+ public void testCleanupRemovesReportEntriesForZeroEntryPurgedJob()
+ throws RocksDBException, IOException {
+ diffCleanupService.suspend();
+
+ long currentTime = System.currentTimeMillis() - 1;
+ SnapshotDiffJob failedJob = addJobAndReport(FAILED, currentTime, 0);
+ addReportEntries(failedJob.getJobId(), 2);
+
+ diffCleanupService.resume();
+
+ diffCleanupService.run();
+ assertJobInPurgedTable(failedJob.getJobId(),
+ failedJob.getTotalDiffEntries());
+ assertReport(failedJob.getJobId(), 2, emptyReportEntry);
+
+ diffCleanupService.run();
+ assertNumberOfEntriesInTable(purgedJobTableCfh, 0);
+ assertReport(failedJob.getJobId(), 2, null);
+ }
+
private SnapshotDiffJob addJobAndReport(JobStatus jobStatus,
long creationTime,
long noOfEntries)
@@ -315,6 +335,15 @@ private SnapshotDiffJob addJobAndReport(JobStatus
jobStatus,
return job;
}
+ private void addReportEntries(String jobId, int noOfEntries)
+ throws IOException, RocksDBException {
+ for (int i = 0; i < noOfEntries; i++) {
+ db.get().put(reportTableCfh,
+ codecRegistry.asRawData(jobId + DELIMITER + i),
+ emptyReportEntry);
+ }
+ }
+
private void assertJobAndReport(SnapshotDiffJob expectedJob,
boolean isExpected)
throws IOException, RocksDBException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]