This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 0a029def5a HDDS-8282. [Snapshot] Intermittent DB crash in
RocksDBCheckpointDiffer (#4492)
0a029def5a is described below
commit 0a029def5aa89c408711f7f2ce2de39d7a8e48b9
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Thu Mar 30 02:09:41 2023 +0200
HDDS-8282. [Snapshot] Intermittent DB crash in RocksDBCheckpointDiffer
(#4492)
---
.../org/apache/hadoop/hdds/utils/db/RDBStore.java | 2 +
.../ozone/rocksdiff/RocksDBCheckpointDiffer.java | 128 ++++++++++++---------
2 files changed, 73 insertions(+), 57 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index 7a0cf350f3..31bfccdb86 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.RocksDBStoreMBean;
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
@@ -221,6 +222,7 @@ public class RDBStore implements DBStore {
RDBMetrics.unRegister();
checkPointManager.close();
+ IOUtils.closeQuietly(rocksDBCheckpointDiffer);
db.close();
}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 31525aae1b..8955cf64a5 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -167,6 +167,7 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable {
private final Object compactionListenerWriteLock = new Object();
private final ScheduledExecutorService executor;
+ private boolean closed;
private final long maxAllowedTimeInDag;
private ColumnFamilyHandle snapshotInfoTableCFHandle;
@@ -303,9 +304,14 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable {
}
@Override
- public void close() throws Exception {
- if (executor != null) {
- executor.shutdown();
+ public void close() {
+ synchronized (compactionListenerWriteLock) {
+ if (!closed) {
+ closed = true;
+ if (executor != null) {
+ executor.shutdown();
+ }
+ }
}
}
@@ -457,9 +463,8 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable {
public void onCompactionBegin(RocksDB db,
CompactionJobInfo compactionJobInfo) {
- // Skip compaction DAG tracking if the snapshotInfoTable is empty.
- // i.e. No snapshot exists in OM.
- if (isSnapshotInfoTableEmpty(db)) {
+ if (compactionJobInfo.inputFiles().size() == 0) {
+ LOG.error("Compaction input files list is empty");
return;
}
@@ -468,9 +473,13 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable {
// all SST files.
synchronized (compactionListenerWriteLock) {
+ if (closed) {
+ return;
+ }
- if (compactionJobInfo.inputFiles().size() == 0) {
- LOG.error("Compaction input files list is empty");
+ // Skip compaction DAG tracking if the snapshotInfoTable is empty.
+ // i.e. No snapshot exists in OM.
+ if (isSnapshotInfoTableEmpty(db)) {
return;
}
@@ -507,70 +516,75 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable {
public void onCompactionCompleted(RocksDB db,
CompactionJobInfo compactionJobInfo) {
- // Skip compaction DAG tracking if the snapshotInfoTable is empty.
- // i.e. No snapshot exists in OM.
- if (isSnapshotInfoTableEmpty(db)) {
+ if (compactionJobInfo.inputFiles().isEmpty()) {
+ LOG.error("Compaction input files list is empty");
return;
}
- synchronized (compactionListenerWriteLock) {
+ if (new HashSet<>(compactionJobInfo.inputFiles())
+ .equals(new HashSet<>(compactionJobInfo.outputFiles()))) {
+ LOG.info("Skipped the compaction entry. Compaction input files: " +
+ "{} and output files: {} are same.",
+ compactionJobInfo.inputFiles(),
+ compactionJobInfo.outputFiles());
+ return;
+ }
- if (compactionJobInfo.inputFiles().isEmpty()) {
- LOG.error("Compaction input files list is empty");
- return;
- }
+ final StringBuilder sb = new StringBuilder();
- if (new HashSet<>(compactionJobInfo.inputFiles())
- .equals(new HashSet<>(compactionJobInfo.outputFiles()))) {
- LOG.info("Skipped the compaction entry. Compaction input files: " +
- "{} and output files: {} are same.",
- compactionJobInfo.inputFiles(),
- compactionJobInfo.outputFiles());
- return;
- }
+ if (LOG.isDebugEnabled()) {
+ // Print compaction reason for this entry in the log file
+ // e.g. kLevelL0FilesNum / kLevelMaxLevelSize.
+ sb.append(COMPACTION_LOG_COMMENT_LINE_PREFIX)
+ .append(compactionJobInfo.compactionReason())
+ .append('\n');
+ }
- final StringBuilder sb = new StringBuilder();
+ // Mark the beginning of a compaction log
+ sb.append(COMPACTION_LOG_ENTRY_LINE_PREFIX);
- if (LOG.isDebugEnabled()) {
- // Print compaction reason for this entry in the log file
- // e.g. kLevelL0FilesNum / kLevelMaxLevelSize.
- sb.append(COMPACTION_LOG_COMMENT_LINE_PREFIX)
- .append(compactionJobInfo.compactionReason())
- .append('\n');
- }
+ // Trim DB path, only keep the SST file name
+ final int filenameOffset =
+ compactionJobInfo.inputFiles().get(0).lastIndexOf("/") + 1;
+
+ // Append the list of input files
+ final List<String> inputFiles = compactionJobInfo.inputFiles();
+ // Trim the file path, leave only the SST file name without extension
+ inputFiles.replaceAll(s -> s.substring(
+ filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
+ final String inputFilesJoined =
+ String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, inputFiles);
+ sb.append(inputFilesJoined);
- // Mark the beginning of a compaction log
- sb.append(COMPACTION_LOG_ENTRY_LINE_PREFIX);
+ // Insert delimiter between input files and output files
+ sb.append(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
- // Trim DB path, only keep the SST file name
- final int filenameOffset =
- compactionJobInfo.inputFiles().get(0).lastIndexOf("/") + 1;
+ // Append the list of output files
+ final List<String> outputFiles = compactionJobInfo.outputFiles();
+ outputFiles.replaceAll(s -> s.substring(
+ filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
+ final String outputFilesJoined =
+ String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, outputFiles);
+ sb.append(outputFilesJoined);
- // Append the list of input files
- final List<String> inputFiles = compactionJobInfo.inputFiles();
- // Trim the file path, leave only the SST file name without extension
- inputFiles.replaceAll(s -> s.substring(
- filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
- final String inputFilesJoined =
- String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, inputFiles);
- sb.append(inputFilesJoined);
+ // End of line
+ sb.append('\n');
- // Insert delimiter between input files and output files
- sb.append(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
+ String content = sb.toString();
- // Append the list of output files
- final List<String> outputFiles = compactionJobInfo.outputFiles();
- outputFiles.replaceAll(s -> s.substring(
- filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
- final String outputFilesJoined =
- String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, outputFiles);
- sb.append(outputFilesJoined);
+ synchronized (compactionListenerWriteLock) {
+ if (closed) {
+ return;
+ }
- // End of line
- sb.append('\n');
+ // Skip compaction DAG tracking if the snapshotInfoTable is empty.
+ // i.e. No snapshot exists in OM.
+ if (isSnapshotInfoTableEmpty(db)) {
+ return;
+ }
// Write input and output file names to compaction log
- appendToCurrentCompactionLog(sb.toString());
+ appendToCurrentCompactionLog(content);
// Populate the DAG
// TODO: Once SnapshotChainManager is put into use, set snapshotID to
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]