This is an automated email from the ASF dual-hosted git repository.
prashantpogde pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 2fbf702a9c HDDS-8987. [Snapshot] Make RocksDB delete sstFile api
synchronous. Invalidate Snapshot cache before copying incremental snapshot
(#5035)
2fbf702a9c is described below
commit 2fbf702a9ccea1521e48eb5c4d692b673e69df1c
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Thu Jul 13 13:29:57 2023 -0700
HDDS-8987. [Snapshot] Make RocksDB delete sstFile api synchronous.
Invalidate Snapshot cache before copying incremental snapshot (#5035)
---
.../apache/hadoop/hdds/utils/db/RocksDatabase.java | 5 +--
hadoop-hdds/managed-rocksdb/pom.xml | 4 ++
.../hdds/utils/db/managed/ManagedRocksDB.java | 24 +++++++++++
.../utils/db/managed/ManagedRocksObjectUtils.java | 49 ++++++++++++++++++++++
.../org/apache/ozone/rocksdb/util/RdbUtil.java | 1 -
.../hadoop/ozone/om/TestOMRatisSnapshots.java | 27 +++++++-----
.../org/apache/hadoop/ozone/om/OzoneManager.java | 4 +-
.../hadoop/ozone/om/SstFilteringService.java | 18 +++++++-
8 files changed, 115 insertions(+), 17 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index 2a507b8b53..eb724d764b 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -982,11 +982,10 @@ public final class RocksDatabase implements Closeable {
boolean isKeyWithPrefixPresent =
filterFunction.apply(firstDbKey, lastDbKey, prefixForColumnFamily);
if (!isKeyWithPrefixPresent) {
- String sstFileName = liveFileMetaData.fileName();
LOG.info("Deleting sst file {} corresponding to column family"
- + " {} from db: {}", sstFileName,
+ + " {} from db: {}", liveFileMetaData.fileName(),
liveFileMetaData.columnFamilyName(), db.get().getName());
- db.get().deleteFile(sstFileName);
+ db.deleteFile(liveFileMetaData);
}
}
}
diff --git a/hadoop-hdds/managed-rocksdb/pom.xml
b/hadoop-hdds/managed-rocksdb/pom.xml
index 47cf9415a6..573d5c0f96 100644
--- a/hadoop-hdds/managed-rocksdb/pom.xml
+++ b/hadoop-hdds/managed-rocksdb/pom.xml
@@ -36,6 +36,10 @@
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
</dependencies>
<build/>
diff --git
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java
index 6537200999..a2bb1d0f07 100644
---
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java
+++
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java
@@ -21,9 +21,15 @@ package org.apache.hadoop.hdds.utils.db.managed;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
+import org.rocksdb.LiveFileMetaData;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
import java.util.List;
/**
@@ -33,6 +39,8 @@ public class ManagedRocksDB extends ManagedObject<RocksDB> {
public static final Class<RocksDB> ORIGINAL_CLASS = RocksDB.class;
public static final int NOT_FOUND = RocksDB.NOT_FOUND;
+ static final Logger LOG = LoggerFactory.getLogger(ManagedRocksDB.class);
+
ManagedRocksDB(RocksDB original) {
super(original);
}
@@ -78,4 +86,20 @@ public class ManagedRocksDB extends ManagedObject<RocksDB> {
RocksDB.open(path, columnFamilyDescriptors, columnFamilyHandles)
);
}
+
+ /**
+ * Delete liveMetaDataFile from rocks db using RocksDB#deleteFile Api.
+ * This function makes the RocksDB#deleteFile Api synchronized by waiting
+ * for the deletes to happen.
+ * @param fileToBeDeleted File to be deleted.
+ * @throws RocksDBException In the underlying db throws an exception.
+ * @throws IOException In the case file is not deleted.
+ */
+ public void deleteFile(LiveFileMetaData fileToBeDeleted)
+ throws RocksDBException, IOException {
+ String sstFileName = fileToBeDeleted.fileName();
+ this.get().deleteFile(sstFileName);
+ File file = new File(fileToBeDeleted.path(), fileToBeDeleted.fileName());
+ ManagedRocksObjectUtils.waitForFileDelete(file, Duration.ofSeconds(60));
+ }
}
diff --git
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java
index 3885100f83..83bb0dc51d 100644
---
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java
+++
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java
@@ -19,10 +19,17 @@
package org.apache.hadoop.hdds.utils.db.managed;
import org.apache.hadoop.hdds.HddsUtils;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
import org.rocksdb.RocksObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+
/**
* Utilities to help assert RocksObject closures.
*/
@@ -32,6 +39,8 @@ public final class ManagedRocksObjectUtils {
public static final Logger LOG =
LoggerFactory.getLogger(ManagedRocksObjectUtils.class);
+ private static final Duration POLL_DELAY_DURATION = Duration.ZERO;
+ private static final Duration POLL_INTERVAL_DURATION =
Duration.ofMillis(100);
static void assertClosed(RocksObject rocksObject) {
assertClosed(rocksObject, null);
@@ -64,4 +73,44 @@ public final class ManagedRocksObjectUtils {
return HddsUtils.formatStackTrace(elements, 3);
}
+ /**
+ * Wait for file to be deleted.
+ * @param file File to be deleted.
+ * @param maxDuration poll max duration.
+ * @param interval poll interval.
+ * @param pollDelayDuration poll delay val.
+ * @return true if deleted.
+ */
+ public static void waitForFileDelete(File file, Duration maxDuration,
+ Duration interval,
+ Duration pollDelayDuration)
+ throws IOException {
+ Instant start = Instant.now();
+ try {
+ Awaitility.with().atMost(maxDuration)
+ .pollDelay(pollDelayDuration)
+ .pollInterval(interval)
+ .await()
+ .until(() -> !file.exists());
+ LOG.info("Waited for {} milliseconds for file {} deletion.",
+ Duration.between(start, Instant.now()).toMillis(),
+ file.getAbsoluteFile());
+ } catch (ConditionTimeoutException exception) {
+ LOG.info("File: {} didn't get deleted in {} secs.",
+ file.getAbsolutePath(), maxDuration.getSeconds());
+ throw new IOException(exception);
+ }
+ }
+
+ /**
+ * Wait for file to be deleted.
+ * @param file File to be deleted.
+ * @param maxDuration poll max duration.
+ * @throws IOException in case of failure.
+ */
+ public static void waitForFileDelete(File file, Duration maxDuration)
+ throws IOException {
+ waitForFileDelete(file, maxDuration, POLL_INTERVAL_DURATION,
+ POLL_DELAY_DURATION);
+ }
}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
index 172209c584..ac17856cd8 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
@@ -58,5 +58,4 @@ public final class RdbUtil {
.collect(Collectors.toCollection(HashSet::new));
}
}
-
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index 2042f5a55a..6d9cc199b5 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -686,17 +686,24 @@ public class TestOMRatisSnapshots {
.get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
}
- // There is a chance we end up checking the DBCheckpointMetrics
- // before the follower sends another request to the leader
- // to generate a checkpoint.
- // TODO: Add wait check here, to avoid flakiness.
-
// Verify the metrics
- DBCheckpointMetrics dbMetrics = leaderOM.getMetrics().
- getDBCheckpointMetrics();
- assertEquals(0, dbMetrics.getLastCheckpointStreamingNumSSTExcluded());
- assertTrue(dbMetrics.getNumIncrementalCheckpoints() >= 1);
- assertTrue(dbMetrics.getNumCheckpoints() >= 3);
+ GenericTestUtils.waitFor(() -> {
+ DBCheckpointMetrics dbMetrics =
+ leaderOM.getMetrics().getDBCheckpointMetrics();
+ return dbMetrics.getLastCheckpointStreamingNumSSTExcluded() == 0;
+ }, 100, 10000);
+
+ GenericTestUtils.waitFor(() -> {
+ DBCheckpointMetrics dbMetrics =
+ leaderOM.getMetrics().getDBCheckpointMetrics();
+ return dbMetrics.getNumIncrementalCheckpoints() >= 1;
+ }, 100, 10000);
+
+ GenericTestUtils.waitFor(() -> {
+ DBCheckpointMetrics dbMetrics =
+ leaderOM.getMetrics().getDBCheckpointMetrics();
+ return dbMetrics.getNumCheckpoints() >= 3;
+ }, 100, 10000);
// Verify RPC server is running
GenericTestUtils.waitFor(() -> {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 3c0ffbdb7b..104496257e 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -3577,7 +3577,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
* @return If checkpoint is installed successfully, return the
* corresponding termIndex. Otherwise, return null.
*/
- public TermIndex installSnapshotFromLeader(String leaderId) {
+ public synchronized TermIndex installSnapshotFromLeader(String leaderId) {
if (omRatisSnapshotProvider == null) {
LOG.error("OM Snapshot Provider is not configured as there are no peer "
+
"nodes.");
@@ -3632,7 +3632,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
keyManager.stop();
stopSecretManager();
stopTrashEmptier();
-
+ omSnapshotManager.getSnapshotCache().invalidateAll();
// Pause the State Machine so that no new transactions can be applied.
// This action also clears the OM Double Buffer so that if there are any
// pending transactions in the buffer, they are discarded.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
index 2ad8084416..419db2f645 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
@@ -49,6 +49,7 @@ import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.ozone.OzoneConsts.FILTERED_SNAPSHOTS;
@@ -86,6 +87,8 @@ public class SstFilteringService extends BackgroundService
private AtomicLong snapshotFilteredCount;
+ private AtomicBoolean running;
+
private BooleanTriFunction<String, String, String, Boolean> filterFunction =
(first, last, prefix) -> {
String firstBucketKey = RocksDiffUtils.constructBucketKey(first);
@@ -103,11 +106,18 @@ public class SstFilteringService extends BackgroundService
.getLong(SNAPSHOT_SST_DELETING_LIMIT_PER_TASK,
SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT);
snapshotFilteredCount = new AtomicLong(0);
+ running = new AtomicBoolean(false);
}
private final BootstrapStateHandler.Lock lock =
new BootstrapStateHandler.Lock();
+ @Override
+ public void start() {
+ running.set(true);
+ super.start();
+ }
+
private class SstFilteringTask implements BackgroundTask {
@Override
@@ -123,7 +133,7 @@ public class SstFilteringService extends BackgroundService
long snapshotLimit = snapshotLimitPerTask;
- while (iterator.hasNext() && snapshotLimit > 0) {
+ while (iterator.hasNext() && snapshotLimit > 0 && running.get()) {
Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
String snapShotTableKey = keyValue.getKey();
SnapshotInfo snapshotInfo = keyValue.getValue();
@@ -231,4 +241,10 @@ public class SstFilteringService extends BackgroundService
public BootstrapStateHandler.Lock getBootstrapStateLock() {
return lock;
}
+
+ @Override
+ public void shutdown() {
+ running.set(false);
+ super.shutdown();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]