Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 5135905a0 -> 063b37614 refs/heads/trunk f5b2660bd -> 2ae587f5c
LogAwareFileLister should only use OLD sstable files in current folder to determine disk consistency Patch by Stefania Alborghetti; reviewed by marcuse for CASSANDRA-11470 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/063b3761 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/063b3761 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/063b3761 Branch: refs/heads/cassandra-3.0 Commit: 063b37614442b3184342a04c50c7961d1777e13b Parents: 5135905 Author: Stefania Alborghetti <[email protected]> Authored: Thu Apr 7 15:09:41 2016 +0800 Committer: Marcus Eriksson <[email protected]> Committed: Mon Apr 11 13:23:50 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/lifecycle/LogAwareFileLister.java | 43 ++++++++----- .../apache/cassandra/db/lifecycle/LogFile.java | 18 +++++- .../cassandra/db/lifecycle/LogRecord.java | 7 +++ .../db/lifecycle/LogTransactionTest.java | 66 ++++++++++++++++++++ 5 files changed, 118 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/063b3761/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 26ab66d..47e6105 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.6 + * LogAwareFileLister should only use OLD sstable files in current folder to determine disk consistency (CASSANDRA-11470) * Notify indexers of expired rows during compaction (CASSANDRA-11329) * Properly respond with ProtocolError when a v1/v2 native protocol header is received (CASSANDRA-11464) http://git-wip-us.apache.org/repos/asf/cassandra/blob/063b3761/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java b/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java index 3393b5c..4d3d46d 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java @@ -8,11 +8,13 @@ import java.nio.file.Path; import java.util.*; import java.util.function.BiFunction; import java.util.stream.Collectors; -import java.util.stream.Stream; import java.util.stream.StreamSupport; import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.db.Directories; import static org.apache.cassandra.db.Directories.*; @@ -22,6 +24,8 @@ import static org.apache.cassandra.db.Directories.*; */ final class LogAwareFileLister { + private static final Logger logger = LoggerFactory.getLogger(LogAwareFileLister.class); + // The folder to scan private final Path folder; @@ -112,8 +116,8 @@ final class LogAwareFileLister void classifyFiles(LogFile txnFile) { - Map<LogRecord, Set<File>> oldFiles = txnFile.getFilesOfType(files.navigableKeySet(), LogRecord.Type.REMOVE); - Map<LogRecord, Set<File>> newFiles = txnFile.getFilesOfType(files.navigableKeySet(), LogRecord.Type.ADD); + Map<LogRecord, Set<File>> oldFiles = txnFile.getFilesOfType(folder, files.navigableKeySet(), LogRecord.Type.REMOVE); + Map<LogRecord, Set<File>> newFiles = txnFile.getFilesOfType(folder, files.navigableKeySet(), LogRecord.Type.ADD); if (txnFile.completed()) { // last record present, filter regardless of disk status @@ -121,13 +125,13 @@ final class LogAwareFileLister return; } - if (allFilesPresent(txnFile, oldFiles, newFiles)) - { // all files present, transaction is in progress, this will filter as aborted + if (allFilesPresent(oldFiles)) + { // all old files present, transaction is in progress, this will filter as aborted setTemporary(txnFile, oldFiles.values(), newFiles.values()); return; } - // some files are missing, we expect the txn file to either also be missing or completed, so check + // some old files are missing, we expect the txn file to either also be missing or completed, so check // disk state again to resolve any previous races on non-atomic directory listing platforms // if txn file also gone, then do nothing (all temporary should be gone, we could remove them if any) @@ -143,23 +147,30 @@ final class LogAwareFileLister return; } - // some files are missing and yet the txn is still there and not completed - // something must be wrong (see comment at the top of this file requiring txn to be + logger.error("Failed to classify files in {}\n" + + "Some old files are missing but the txn log is still there and not completed\n" + + "Files in folder:\n{}\nTxn: {}\n{}", + folder, + files.isEmpty() + ? "\t-" + : String.join("\n", files.keySet().stream().map(f -> String.format("\t%s", f)).collect(Collectors.toList())), + txnFile.toString(), + String.join("\n", txnFile.getRecords().stream().map(r -> String.format("\t%s", r)).collect(Collectors.toList()))); + + // some old files are missing and yet the txn is still there and not completed + // something must be wrong (see comment at the top of LogTransaction requiring txn to be // completed before obsoleting or aborting sstables) throw new RuntimeException(String.format("Failed to list directory files in %s, inconsistent disk state for transaction %s", folder, txnFile)); } - /** See if all files are present or if only the last record files are missing and it's a NEW record */ - private static boolean allFilesPresent(LogFile txnFile, Map<LogRecord, Set<File>> oldFiles, Map<LogRecord, Set<File>> newFiles) + /** See if all files are present */ + private static boolean allFilesPresent(Map<LogRecord, Set<File>> oldFiles) { - LogRecord lastRecord = txnFile.getLastRecord(); - return !Stream.concat(oldFiles.entrySet().stream(), - newFiles.entrySet().stream() - .filter((e) -> e.getKey() != lastRecord)) - .filter((e) -> e.getKey().numFiles > e.getValue().size()) - .findFirst().isPresent(); + return !oldFiles.entrySet().stream() + .filter((e) -> e.getKey().numFiles > e.getValue().size()) + .findFirst().isPresent(); } private void setTemporary(LogFile txnFile, Collection<Set<File>> oldFiles, Collection<Set<File>> newFiles) http://git-wip-us.apache.org/repos/asf/cassandra/blob/063b3761/src/java/org/apache/cassandra/db/lifecycle/LogFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java index 9064e5f..4c3e550 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java @@ -1,6 +1,7 @@ package org.apache.cassandra.db.lifecycle; import java.io.File; +import java.nio.file.Path; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -315,13 +316,23 @@ final class LogFile files.forEach(LogTransaction::delete); } - Map<LogRecord, Set<File>> getFilesOfType(NavigableSet<File> files, Type type) + /** + * Extract from the files passed in all those that are of the given type. + * + * Scan all records and select those that are of the given type, valid, and + * located in the same folder. For each such record extract from the files passed in + * those that belong to this record. + * + * @return a map linking each mapped record to its files, where the files where passed in as parameters. + */ + Map<LogRecord, Set<File>> getFilesOfType(Path folder, NavigableSet<File> files, Type type) { Map<LogRecord, Set<File>> ret = new HashMap<>(); records.stream() .filter(type::matches) .filter(LogRecord::isValid) + .filter(r -> r.isInFolder(folder)) .forEach((r) -> ret.put(r, getRecordFiles(files, r))); return ret; @@ -378,4 +389,9 @@ final class LogFile LogFile.EXT); return StringUtils.join(folder, File.separator, fileName); } + + Collection<LogRecord> getRecords() + { + return records; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/063b3761/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java index 9e606fc..9b7d59e 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java @@ -255,6 +255,13 @@ final class LogRecord return absolutePath.isPresent() ? Paths.get(absolutePath.get()).getFileName().toString() : ""; } + boolean isInFolder(Path folder) + { + return absolutePath.isPresent() + ? FileUtils.isContained(folder.toFile(), Paths.get(absolutePath.get()).toFile()) + : false; + } + String absolutePath() { return absolutePath.isPresent() ? absolutePath.get() : ""; http://git-wip-us.apache.org/repos/asf/cassandra/blob/063b3761/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java index 45b5844..0f03baf 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java @@ -831,6 +831,72 @@ public class LogTransactionTest extends AbstractTransactionalTest } @Test + public void testGetTemporaryFilesMultipleFolders() throws IOException + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + + File origiFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + File dataFolder1 = new File(origiFolder, "1"); + File dataFolder2 = new File(origiFolder, "2"); + Files.createDirectories(dataFolder1.toPath()); + Files.createDirectories(dataFolder2.toPath()); + + SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128), + sstable(dataFolder1, cfs, 1, 128), + sstable(dataFolder2, cfs, 2, 128), + sstable(dataFolder2, cfs, 3, 128) + }; + + // they should all have the same number of files since they are created in the same way + int numSStableFiles = sstables[0].getAllFilePaths().size(); + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + for (File dataFolder : new File[] {dataFolder1, dataFolder2}) + { + Set<File> tmpFiles = getTemporaryFiles(dataFolder); + assertNotNull(tmpFiles); + assertEquals(0, tmpFiles.size()); + } + + LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) }; + + log.trackNew(sstables[1]); + log.trackNew(sstables[3]); + + for (File dataFolder : new File[] {dataFolder1, dataFolder2}) + { + Set<File> tmpFiles = getTemporaryFiles(dataFolder); + assertNotNull(tmpFiles); + assertEquals(numSStableFiles, tmpFiles.size()); + } + + log.finish(); + + for (File dataFolder : new File[] {dataFolder1, dataFolder2}) + { + Set<File> tmpFiles = getTemporaryFiles(dataFolder); + assertNotNull(tmpFiles); + assertEquals(numSStableFiles, tmpFiles.size()); + } + + sstables[0].markObsolete(tidiers[0]); + sstables[2].markObsolete(tidiers[1]); + + Arrays.stream(sstables).forEach(s -> s.selfRef().release()); + LogTransaction.waitForDeletions(); + + for (File dataFolder : new File[] {dataFolder1, dataFolder2}) + { + Set<File> tmpFiles = getTemporaryFiles(dataFolder); + assertNotNull(tmpFiles); + assertEquals(0, tmpFiles.size()); + } + + } + + @Test public void testWrongChecksumLastLine() throws IOException { testCorruptRecord((t, s) ->
