This is an automated email from the ASF dual-hosted git repository.

prashantpogde pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fbf702a9c HDDS-8987. [Snapshot] Make RocksDB delete sstFile api 
synchronous. Invalidate Snapshot cache before copying incremental snapshot 
(#5035)
2fbf702a9c is described below

commit 2fbf702a9ccea1521e48eb5c4d692b673e69df1c
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Thu Jul 13 13:29:57 2023 -0700

    HDDS-8987. [Snapshot] Make RocksDB delete sstFile api synchronous. 
Invalidate Snapshot cache before copying incremental snapshot (#5035)
---
 .../apache/hadoop/hdds/utils/db/RocksDatabase.java |  5 +--
 hadoop-hdds/managed-rocksdb/pom.xml                |  4 ++
 .../hdds/utils/db/managed/ManagedRocksDB.java      | 24 +++++++++++
 .../utils/db/managed/ManagedRocksObjectUtils.java  | 49 ++++++++++++++++++++++
 .../org/apache/ozone/rocksdb/util/RdbUtil.java     |  1 -
 .../hadoop/ozone/om/TestOMRatisSnapshots.java      | 27 +++++++-----
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  4 +-
 .../hadoop/ozone/om/SstFilteringService.java       | 18 +++++++-
 8 files changed, 115 insertions(+), 17 deletions(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index 2a507b8b53..eb724d764b 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -982,11 +982,10 @@ public final class RocksDatabase implements Closeable {
         boolean isKeyWithPrefixPresent =
             filterFunction.apply(firstDbKey, lastDbKey, prefixForColumnFamily);
         if (!isKeyWithPrefixPresent) {
-          String sstFileName = liveFileMetaData.fileName();
           LOG.info("Deleting sst file {} corresponding to column family"
-                  + " {} from db: {}", sstFileName,
+                  + " {} from db: {}", liveFileMetaData.fileName(),
               liveFileMetaData.columnFamilyName(), db.get().getName());
-          db.get().deleteFile(sstFileName);
+          db.deleteFile(liveFileMetaData);
         }
       }
     }
diff --git a/hadoop-hdds/managed-rocksdb/pom.xml 
b/hadoop-hdds/managed-rocksdb/pom.xml
index 47cf9415a6..573d5c0f96 100644
--- a/hadoop-hdds/managed-rocksdb/pom.xml
+++ b/hadoop-hdds/managed-rocksdb/pom.xml
@@ -36,6 +36,10 @@
       <groupId>org.rocksdb</groupId>
       <artifactId>rocksdbjni</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+    </dependency>
   </dependencies>
 
   <build/>
diff --git 
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java
 
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java
index 6537200999..a2bb1d0f07 100644
--- 
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java
+++ 
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java
@@ -21,9 +21,15 @@ package org.apache.hadoop.hdds.utils.db.managed;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.DBOptions;
+import org.rocksdb.LiveFileMetaData;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
 import java.util.List;
 
 /**
@@ -33,6 +39,8 @@ public class ManagedRocksDB extends ManagedObject<RocksDB> {
   public static final Class<RocksDB> ORIGINAL_CLASS = RocksDB.class;
   public static final int NOT_FOUND = RocksDB.NOT_FOUND;
 
+  static final Logger LOG = LoggerFactory.getLogger(ManagedRocksDB.class);
+
   ManagedRocksDB(RocksDB original) {
     super(original);
   }
@@ -78,4 +86,20 @@ public class ManagedRocksDB extends ManagedObject<RocksDB> {
         RocksDB.open(path, columnFamilyDescriptors, columnFamilyHandles)
     );
   }
+
+  /**
+   * Delete liveMetaDataFile from rocks db using RocksDB#deleteFile Api.
+   * This function makes the RocksDB#deleteFile Api synchronized by waiting
+   * for the deletes to happen.
+   * @param fileToBeDeleted File to be deleted.
+   * @throws RocksDBException In the underlying db throws an exception.
+   * @throws IOException In the case file is not deleted.
+   */
+  public void deleteFile(LiveFileMetaData fileToBeDeleted)
+      throws RocksDBException, IOException {
+    String sstFileName = fileToBeDeleted.fileName();
+    this.get().deleteFile(sstFileName);
+    File file = new File(fileToBeDeleted.path(), fileToBeDeleted.fileName());
+    ManagedRocksObjectUtils.waitForFileDelete(file, Duration.ofSeconds(60));
+  }
 }
diff --git 
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java
 
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java
index 3885100f83..83bb0dc51d 100644
--- 
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java
+++ 
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java
@@ -19,10 +19,17 @@
 package org.apache.hadoop.hdds.utils.db.managed;
 
 import org.apache.hadoop.hdds.HddsUtils;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
 import org.rocksdb.RocksObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+
 /**
  * Utilities to help assert RocksObject closures.
  */
@@ -32,6 +39,8 @@ public final class ManagedRocksObjectUtils {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(ManagedRocksObjectUtils.class);
+  private static final Duration POLL_DELAY_DURATION = Duration.ZERO;
+  private static final Duration POLL_INTERVAL_DURATION = 
Duration.ofMillis(100);
 
   static void assertClosed(RocksObject rocksObject) {
     assertClosed(rocksObject, null);
@@ -64,4 +73,44 @@ public final class ManagedRocksObjectUtils {
     return HddsUtils.formatStackTrace(elements, 3);
   }
 
+  /**
+   * Wait for file to be deleted.
+   * @param file File to be deleted.
+   * @param maxDuration poll max duration.
+   * @param interval poll interval.
+   * @param pollDelayDuration poll delay val.
+   * @return true if deleted.
+   */
+  public static void waitForFileDelete(File file, Duration maxDuration,
+                                       Duration interval,
+                                       Duration pollDelayDuration)
+      throws IOException {
+    Instant start = Instant.now();
+    try {
+      Awaitility.with().atMost(maxDuration)
+          .pollDelay(pollDelayDuration)
+          .pollInterval(interval)
+          .await()
+          .until(() -> !file.exists());
+      LOG.info("Waited for {} milliseconds for file {} deletion.",
+          Duration.between(start, Instant.now()).toMillis(),
+          file.getAbsoluteFile());
+    } catch (ConditionTimeoutException exception) {
+      LOG.info("File: {} didn't get deleted in {} secs.",
+          file.getAbsolutePath(), maxDuration.getSeconds());
+      throw new IOException(exception);
+    }
+  }
+
+  /**
+   * Wait for file to be deleted.
+   * @param file File to be deleted.
+   * @param maxDuration poll max duration.
+   * @throws IOException in case of failure.
+   */
+  public static void waitForFileDelete(File file, Duration maxDuration)
+      throws IOException {
+    waitForFileDelete(file, maxDuration, POLL_INTERVAL_DURATION,
+        POLL_DELAY_DURATION);
+  }
 }
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
index 172209c584..ac17856cd8 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
@@ -58,5 +58,4 @@ public final class RdbUtil {
           .collect(Collectors.toCollection(HashSet::new));
     }
   }
-
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index 2042f5a55a..6d9cc199b5 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -686,17 +686,24 @@ public class TestOMRatisSnapshots {
           .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
     }
 
-    // There is a chance we end up checking the DBCheckpointMetrics
-    // before the follower sends another request to the leader
-    // to generate a checkpoint.
-    // TODO: Add wait check here, to avoid flakiness.
-
     // Verify the metrics
-    DBCheckpointMetrics dbMetrics = leaderOM.getMetrics().
-        getDBCheckpointMetrics();
-    assertEquals(0, dbMetrics.getLastCheckpointStreamingNumSSTExcluded());
-    assertTrue(dbMetrics.getNumIncrementalCheckpoints() >= 1);
-    assertTrue(dbMetrics.getNumCheckpoints() >= 3);
+    GenericTestUtils.waitFor(() -> {
+      DBCheckpointMetrics dbMetrics =
+          leaderOM.getMetrics().getDBCheckpointMetrics();
+      return dbMetrics.getLastCheckpointStreamingNumSSTExcluded() == 0;
+    }, 100, 10000);
+
+    GenericTestUtils.waitFor(() -> {
+      DBCheckpointMetrics dbMetrics =
+          leaderOM.getMetrics().getDBCheckpointMetrics();
+      return dbMetrics.getNumIncrementalCheckpoints() >= 1;
+    }, 100, 10000);
+
+    GenericTestUtils.waitFor(() -> {
+      DBCheckpointMetrics dbMetrics =
+          leaderOM.getMetrics().getDBCheckpointMetrics();
+      return dbMetrics.getNumCheckpoints() >= 3;
+    }, 100, 10000);
 
     // Verify RPC server is running
     GenericTestUtils.waitFor(() -> {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 3c0ffbdb7b..104496257e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -3577,7 +3577,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    * @return If checkpoint is installed successfully, return the
    *         corresponding termIndex. Otherwise, return null.
    */
-  public TermIndex installSnapshotFromLeader(String leaderId) {
+  public synchronized TermIndex installSnapshotFromLeader(String leaderId) {
     if (omRatisSnapshotProvider == null) {
       LOG.error("OM Snapshot Provider is not configured as there are no peer " 
+
           "nodes.");
@@ -3632,7 +3632,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       keyManager.stop();
       stopSecretManager();
       stopTrashEmptier();
-
+      omSnapshotManager.getSnapshotCache().invalidateAll();
       // Pause the State Machine so that no new transactions can be applied.
       // This action also clears the OM Double Buffer so that if there are any
       // pending transactions in the buffer, they are discarded.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
index 2ad8084416..419db2f645 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
@@ -49,6 +49,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.hadoop.ozone.OzoneConsts.FILTERED_SNAPSHOTS;
@@ -86,6 +87,8 @@ public class SstFilteringService extends BackgroundService
 
   private AtomicLong snapshotFilteredCount;
 
+  private AtomicBoolean running;
+
   private BooleanTriFunction<String, String, String, Boolean> filterFunction =
       (first, last, prefix) -> {
         String firstBucketKey = RocksDiffUtils.constructBucketKey(first);
@@ -103,11 +106,18 @@ public class SstFilteringService extends BackgroundService
         .getLong(SNAPSHOT_SST_DELETING_LIMIT_PER_TASK,
             SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT);
     snapshotFilteredCount = new AtomicLong(0);
+    running = new AtomicBoolean(false);
   }
 
   private final BootstrapStateHandler.Lock lock =
       new BootstrapStateHandler.Lock();
 
+  @Override
+  public void start() {
+    running.set(true);
+    super.start();
+  }
+
   private class SstFilteringTask implements BackgroundTask {
 
     @Override
@@ -123,7 +133,7 @@ public class SstFilteringService extends BackgroundService
 
         long snapshotLimit = snapshotLimitPerTask;
 
-        while (iterator.hasNext() && snapshotLimit > 0) {
+        while (iterator.hasNext() && snapshotLimit > 0 && running.get()) {
           Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
           String snapShotTableKey = keyValue.getKey();
           SnapshotInfo snapshotInfo = keyValue.getValue();
@@ -231,4 +241,10 @@ public class SstFilteringService extends BackgroundService
   public BootstrapStateHandler.Lock getBootstrapStateLock() {
     return lock;
   }
+
+  @Override
+  public void shutdown() {
+    running.set(false);
+    super.shutdown();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to