This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a029def5a HDDS-8282. [Snapshot] Intermittent DB crash in 
RocksDBCheckpointDiffer (#4492)
0a029def5a is described below

commit 0a029def5aa89c408711f7f2ce2de39d7a8e48b9
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Thu Mar 30 02:09:41 2023 +0200

    HDDS-8282. [Snapshot] Intermittent DB crash in RocksDBCheckpointDiffer 
(#4492)
---
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |   2 +
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   | 128 ++++++++++++---------
 2 files changed, 73 insertions(+), 57 deletions(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index 7a0cf350f3..31bfccdb86 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -31,6 +31,7 @@ import java.util.Set;
 
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.RocksDBStoreMBean;
 import org.apache.hadoop.hdds.utils.db.cache.TableCache;
 import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
@@ -221,6 +222,7 @@ public class RDBStore implements DBStore {
 
     RDBMetrics.unRegister();
     checkPointManager.close();
+    IOUtils.closeQuietly(rocksDBCheckpointDiffer);
     db.close();
   }
 
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 31525aae1b..8955cf64a5 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -167,6 +167,7 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable {
   private final Object compactionListenerWriteLock = new Object();
 
   private final ScheduledExecutorService executor;
+  private boolean closed;
   private final long maxAllowedTimeInDag;
 
   private ColumnFamilyHandle snapshotInfoTableCFHandle;
@@ -303,9 +304,14 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable {
   }
 
   @Override
-  public void close() throws Exception {
-    if (executor != null) {
-      executor.shutdown();
+  public void close() {
+    synchronized (compactionListenerWriteLock) {
+      if (!closed) {
+        closed = true;
+        if (executor != null) {
+          executor.shutdown();
+        }
+      }
     }
   }
 
@@ -457,9 +463,8 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable {
       public void onCompactionBegin(RocksDB db,
           CompactionJobInfo compactionJobInfo) {
 
-        // Skip compaction DAG tracking if the snapshotInfoTable is empty.
-        // i.e. No snapshot exists in OM.
-        if (isSnapshotInfoTableEmpty(db)) {
+        if (compactionJobInfo.inputFiles().size() == 0) {
+          LOG.error("Compaction input files list is empty");
           return;
         }
 
@@ -468,9 +473,13 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable {
         // all SST files.
 
         synchronized (compactionListenerWriteLock) {
+          if (closed) {
+            return;
+          }
 
-          if (compactionJobInfo.inputFiles().size() == 0) {
-            LOG.error("Compaction input files list is empty");
+          // Skip compaction DAG tracking if the snapshotInfoTable is empty.
+          // i.e. No snapshot exists in OM.
+          if (isSnapshotInfoTableEmpty(db)) {
             return;
           }
 
@@ -507,70 +516,75 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable {
       public void onCompactionCompleted(RocksDB db,
           CompactionJobInfo compactionJobInfo) {
 
-        // Skip compaction DAG tracking if the snapshotInfoTable is empty.
-        // i.e. No snapshot exists in OM.
-        if (isSnapshotInfoTableEmpty(db)) {
+        if (compactionJobInfo.inputFiles().isEmpty()) {
+          LOG.error("Compaction input files list is empty");
           return;
         }
 
-        synchronized (compactionListenerWriteLock) {
+        if (new HashSet<>(compactionJobInfo.inputFiles())
+            .equals(new HashSet<>(compactionJobInfo.outputFiles()))) {
+          LOG.info("Skipped the compaction entry. Compaction input files: " +
+                  "{} and output files: {} are same.",
+              compactionJobInfo.inputFiles(),
+              compactionJobInfo.outputFiles());
+          return;
+        }
 
-          if (compactionJobInfo.inputFiles().isEmpty()) {
-            LOG.error("Compaction input files list is empty");
-            return;
-          }
+        final StringBuilder sb = new StringBuilder();
 
-          if (new HashSet<>(compactionJobInfo.inputFiles())
-              .equals(new HashSet<>(compactionJobInfo.outputFiles()))) {
-            LOG.info("Skipped the compaction entry. Compaction input files: " +
-                "{} and output files: {} are same.",
-                compactionJobInfo.inputFiles(),
-                compactionJobInfo.outputFiles());
-            return;
-          }
+        if (LOG.isDebugEnabled()) {
+          // Print compaction reason for this entry in the log file
+          // e.g. kLevelL0FilesNum / kLevelMaxLevelSize.
+          sb.append(COMPACTION_LOG_COMMENT_LINE_PREFIX)
+              .append(compactionJobInfo.compactionReason())
+              .append('\n');
+        }
 
-          final StringBuilder sb = new StringBuilder();
+        // Mark the beginning of a compaction log
+        sb.append(COMPACTION_LOG_ENTRY_LINE_PREFIX);
 
-          if (LOG.isDebugEnabled()) {
-            // Print compaction reason for this entry in the log file
-            // e.g. kLevelL0FilesNum / kLevelMaxLevelSize.
-            sb.append(COMPACTION_LOG_COMMENT_LINE_PREFIX)
-                .append(compactionJobInfo.compactionReason())
-                .append('\n');
-          }
+        // Trim DB path, only keep the SST file name
+        final int filenameOffset =
+            compactionJobInfo.inputFiles().get(0).lastIndexOf("/") + 1;
+
+        // Append the list of input files
+        final List<String> inputFiles = compactionJobInfo.inputFiles();
+        // Trim the file path, leave only the SST file name without extension
+        inputFiles.replaceAll(s -> s.substring(
+            filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
+        final String inputFilesJoined =
+            String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, inputFiles);
+        sb.append(inputFilesJoined);
 
-          // Mark the beginning of a compaction log
-          sb.append(COMPACTION_LOG_ENTRY_LINE_PREFIX);
+        // Insert delimiter between input files and output files
+        sb.append(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
 
-          // Trim DB path, only keep the SST file name
-          final int filenameOffset =
-              compactionJobInfo.inputFiles().get(0).lastIndexOf("/") + 1;
+        // Append the list of output files
+        final List<String> outputFiles = compactionJobInfo.outputFiles();
+        outputFiles.replaceAll(s -> s.substring(
+            filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
+        final String outputFilesJoined =
+            String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, outputFiles);
+        sb.append(outputFilesJoined);
 
-          // Append the list of input files
-          final List<String> inputFiles = compactionJobInfo.inputFiles();
-          // Trim the file path, leave only the SST file name without extension
-          inputFiles.replaceAll(s -> s.substring(
-              filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
-          final String inputFilesJoined =
-              String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, inputFiles);
-          sb.append(inputFilesJoined);
+        // End of line
+        sb.append('\n');
 
-          // Insert delimiter between input files and output files
-          sb.append(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
+        String content = sb.toString();
 
-          // Append the list of output files
-          final List<String> outputFiles = compactionJobInfo.outputFiles();
-          outputFiles.replaceAll(s -> s.substring(
-              filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
-          final String outputFilesJoined =
-              String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, outputFiles);
-          sb.append(outputFilesJoined);
+        synchronized (compactionListenerWriteLock) {
+          if (closed) {
+            return;
+          }
 
-          // End of line
-          sb.append('\n');
+          // Skip compaction DAG tracking if the snapshotInfoTable is empty.
+          // i.e. No snapshot exists in OM.
+          if (isSnapshotInfoTableEmpty(db)) {
+            return;
+          }
 
           // Write input and output file names to compaction log
-          appendToCurrentCompactionLog(sb.toString());
+          appendToCurrentCompactionLog(content);
 
           // Populate the DAG
           // TODO: Once SnapshotChainManager is put into use, set snapshotID to


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to