Repository: cassandra Updated Branches: refs/heads/trunk bf85616f6 -> edade5ac3
Fixes race during construction of commit log: - Moves thread start outside of constructor for all commit log classes. - Removes all references to CommitLog.instance from commitlog package. patch by Branimir Lambov; reviewed by tjake for CASSANDRA-10049 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/01a11fd2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/01a11fd2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/01a11fd2 Branch: refs/heads/trunk Commit: 01a11fd2626d57bf0c8d0bce1e43060017592896 Parents: e1086bc Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Wed Aug 12 11:59:41 2015 +0300 Committer: T Jake Luciani <j...@apache.org> Committed: Fri Aug 28 09:43:34 2015 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/commitlog/AbstractCommitLogService.java | 2 +- .../cassandra/db/commitlog/CommitLog.java | 24 ++++++++----- .../db/commitlog/CommitLogArchiver.java | 38 +++++++++++++------- .../db/commitlog/CommitLogReplayer.java | 14 ++++---- .../db/commitlog/CommitLogSegmentManager.java | 13 ++----- .../db/commitlog/CommitLogStressTest.java | 9 ++--- .../unit/org/apache/cassandra/SchemaLoader.java | 2 +- .../db/commitlog/CommitLogTestReplayer.java | 10 +++--- .../db/commitlog/CommitLogUpgradeTest.java | 2 +- 10 files changed, 65 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6cffd18..c215a50 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.1 + * Fix race during construction of commit log (CASSANDRA-10049) * Fix LeveledCompactionStrategyTest (CASSANDRA-9757) * Fix broken UnbufferedDataOutputStreamPlus.writeUTF (CASSANDRA-10203) * (cqlsh) add CLEAR command (CASSANDRA-10086) http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index fa981a3..702ace5 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@ -186,7 +186,7 @@ public abstract class AbstractCommitLogService /** * FOR TESTING ONLY */ - public void startUnsafe() + public void restartUnsafe() { while (haveWork.availablePermits() < 1) haveWork.release(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index f23ebae..63005d7 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -76,7 +76,7 @@ public class CommitLog implements CommitLogMBean static private CommitLog construct() { - CommitLog log = new CommitLog(DatabaseDescriptor.getCommitLogLocation(), new CommitLogArchiver()); + CommitLog log = new CommitLog(DatabaseDescriptor.getCommitLogLocation(), CommitLogArchiver.construct()); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try @@ -87,7 +87,7 @@ public class CommitLog implements CommitLogMBean { throw new RuntimeException(e); } - return log; + return log.start(); } @VisibleForTesting @@ -107,12 +107,18 @@ public class CommitLog implements CommitLogMBean : new PeriodicCommitLogService(this); allocator = new CommitLogSegmentManager(this); - executor.start(); // register metrics metrics.attach(executor, allocator); } + CommitLog start() + { + executor.start(); + allocator.start(); + return this; + } + /** * Perform recovery on commit logs located in the directory specified by the config file. * @@ -179,7 +185,7 @@ public class CommitLog implements CommitLogMBean */ public int recover(File... clogs) throws IOException { - CommitLogReplayer recovery = CommitLogReplayer.create(); + CommitLogReplayer recovery = CommitLogReplayer.construct(this); recovery.recover(clogs); return recovery.blockForWrites(); } @@ -189,7 +195,7 @@ public class CommitLog implements CommitLogMBean */ public void recover(String path) throws IOException { - CommitLogReplayer recovery = CommitLogReplayer.create(); + CommitLogReplayer recovery = CommitLogReplayer.construct(this); recovery.recover(new File(path), false); recovery.blockForWrites(); } @@ -411,7 +417,7 @@ public class CommitLog implements CommitLogMBean public int resetUnsafe(boolean deleteSegments) throws IOException { stopUnsafe(deleteSegments); - return startUnsafe(); + return restartUnsafe(); } /** @@ -434,10 +440,10 @@ public class CommitLog implements CommitLogMBean /** * FOR TESTING PURPOSES. See CommitLogAllocator */ - public int startUnsafe() throws IOException + public int restartUnsafe() throws IOException { - allocator.startUnsafe(); - executor.startUnsafe(); + allocator.start(); + executor.restartUnsafe(); return recover(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java index 02072de..4c615e0 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java @@ -53,33 +53,45 @@ public class CommitLogArchiver } public final Map<String, Future<?>> archivePending = new ConcurrentHashMap<String, Future<?>>(); - private final ExecutorService executor = new JMXEnabledThreadPoolExecutor("CommitLogArchiver"); + private final ExecutorService executor; final String archiveCommand; final String restoreCommand; final String restoreDirectories; public long restorePointInTime; public final TimeUnit precision; - public CommitLogArchiver() + public CommitLogArchiver(String archiveCommand, String restoreCommand, String restoreDirectories, + long restorePointInTime, TimeUnit precision) + { + this.archiveCommand = archiveCommand; + this.restoreCommand = restoreCommand; + this.restoreDirectories = restoreDirectories; + this.restorePointInTime = restorePointInTime; + this.precision = precision; + executor = !Strings.isNullOrEmpty(archiveCommand) ? new JMXEnabledThreadPoolExecutor("CommitLogArchiver") : null; + } + + public static CommitLogArchiver disabled() + { + return new CommitLogArchiver(null, null, null, Long.MAX_VALUE, TimeUnit.MICROSECONDS); + } + + public static CommitLogArchiver construct() { Properties commitlog_commands = new Properties(); - try (InputStream stream = getClass().getClassLoader().getResourceAsStream("commitlog_archiving.properties")) + try (InputStream stream = CommitLogArchiver.class.getClassLoader().getResourceAsStream("commitlog_archiving.properties")) { if (stream == null) { logger.debug("No commitlog_archiving properties found; archive + pitr will be disabled"); - archiveCommand = null; - restoreCommand = null; - restoreDirectories = null; - restorePointInTime = Long.MAX_VALUE; - precision = TimeUnit.MICROSECONDS; + return disabled(); } else { commitlog_commands.load(stream); - archiveCommand = commitlog_commands.getProperty("archive_command"); - restoreCommand = commitlog_commands.getProperty("restore_command"); - restoreDirectories = commitlog_commands.getProperty("restore_directories"); + String archiveCommand = commitlog_commands.getProperty("archive_command"); + String restoreCommand = commitlog_commands.getProperty("restore_command"); + String restoreDirectories = commitlog_commands.getProperty("restore_directories"); if (restoreDirectories != null && !restoreDirectories.isEmpty()) { for (String dir : restoreDirectories.split(DELIMITER)) @@ -95,7 +107,8 @@ public class CommitLogArchiver } } String targetTime = commitlog_commands.getProperty("restore_point_in_time"); - precision = TimeUnit.valueOf(commitlog_commands.getProperty("precision", "MICROSECONDS")); + TimeUnit precision = TimeUnit.valueOf(commitlog_commands.getProperty("precision", "MICROSECONDS")); + long restorePointInTime; try { restorePointInTime = Strings.isNullOrEmpty(targetTime) ? Long.MAX_VALUE : format.parse(targetTime).getTime(); @@ -104,6 +117,7 @@ public class CommitLogArchiver { throw new RuntimeException("Unable to parse restore target time", e); } + return new CommitLogArchiver(archiveCommand, restoreCommand, restoreDirectories, restorePointInTime, precision); } } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index af515d2..389b111 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -77,8 +77,9 @@ public class CommitLogReplayer private byte[] uncompressedBuffer; private final ReplayFilter replayFilter; + private final CommitLogArchiver archiver; - CommitLogReplayer(ReplayPosition globalPosition, Map<UUID, ReplayPosition> cfPositions, ReplayFilter replayFilter) + CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition> cfPositions, ReplayFilter replayFilter) { this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>(); this.futures = new ArrayList<Future<?>>(); @@ -91,9 +92,10 @@ public class CommitLogReplayer this.cfPositions = cfPositions; this.globalPosition = globalPosition; this.replayFilter = replayFilter; + this.archiver = commitLog.archiver; } - public static CommitLogReplayer create() + public static CommitLogReplayer construct(CommitLog commitLog) { // compute per-CF and global replay positions Map<UUID, ReplayPosition> cfPositions = new HashMap<UUID, ReplayPosition>(); @@ -113,7 +115,7 @@ public class CommitLogReplayer // Point in time restore is taken to mean that the tables need to be recovered even if they were // deleted at a later point in time. Any truncation record after that point must thus be cleared prior // to recovery (CASSANDRA-9195). - long restoreTime = CommitLog.instance.archiver.restorePointInTime; + long restoreTime = commitLog.archiver.restorePointInTime; long truncatedTime = SystemKeyspace.getTruncatedAt(cfs.metadata.cfId); if (truncatedTime > restoreTime) { @@ -135,7 +137,7 @@ public class CommitLogReplayer } ReplayPosition globalPosition = replayPositionOrdering.min(cfPositions.values()); logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPositions)); - return new CommitLogReplayer(globalPosition, cfPositions, replayFilter); + return new CommitLogReplayer(commitLog, globalPosition, cfPositions, replayFilter); } public void recover(File[] clogs) throws IOException @@ -604,11 +606,11 @@ public class CommitLogReplayer protected boolean pointInTimeExceeded(Mutation fm) { - long restoreTarget = CommitLog.instance.archiver.restorePointInTime; + long restoreTarget = archiver.restorePointInTime; for (ColumnFamily families : fm.getColumnFamilies()) { - if (CommitLog.instance.archiver.precision.toMillis(families.maxTimestamp()) > restoreTarget) + if (archiver.precision.toMillis(families.maxTimestamp()) > restoreTarget) return true; } return false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java index 3f00e97..5918474 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java @@ -95,13 +95,12 @@ public class CommitLogSegmentManager private volatile boolean run = true; private final CommitLog commitLog; - public CommitLogSegmentManager(final CommitLog commitLog) + CommitLogSegmentManager(final CommitLog commitLog) { this.commitLog = commitLog; - start(); } - private void start() + void start() { // The run loop for the manager thread Runnable runnable = new WrappedRunnable() @@ -526,14 +525,6 @@ public class CommitLogSegmentManager } /** - * Starts CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS. - */ - public void startUnsafe() - { - start(); - } - - /** * Initiates the shutdown process for the management thread. */ public void shutdown() http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index 5897dec..1b4edee 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -111,6 +111,7 @@ public class CommitLogStressTest initialize(); CommitLogStressTest tester = new CommitLogStressTest(); + tester.cleanDir(); tester.testFixedSize(); } catch (Throwable e) @@ -206,7 +207,7 @@ public class CommitLogStressTest for (CommitLogSync sync : CommitLogSync.values()) { DatabaseDescriptor.setCommitLogSync(sync); - CommitLog commitLog = new CommitLog(location, CommitLog.instance.archiver); + CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start(); testLog(commitLog); } } @@ -271,7 +272,7 @@ public class CommitLogStressTest System.out.print("Stopped. Replaying... "); System.out.flush(); - Replayer repl = new Replayer(); + Replayer repl = new Replayer(commitLog); File[] files = new File(location).listFiles(); repl.recover(files); @@ -442,9 +443,9 @@ public class CommitLogStressTest class Replayer extends CommitLogReplayer { - Replayer() + Replayer(CommitLog log) { - super(discardedPos, null, ReplayFilter.create()); + super(log, discardedPos, null, ReplayFilter.create()); } int hash = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java index 46f4a9a..a7cf7b4 100644 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@ -463,7 +463,7 @@ public class SchemaLoader mkdirs(); cleanup(); mkdirs(); - CommitLog.instance.startUnsafe(); + CommitLog.instance.restartUnsafe(); } public static void cleanup() http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java index 4ad49ec..c377a21 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java @@ -40,21 +40,21 @@ public class CommitLogTestReplayer extends CommitLogReplayer { CommitLog.instance.sync(true); - CommitLogTestReplayer replayer = new CommitLogTestReplayer(processor); + CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, processor); File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation()); replayer.recover(commitLogDir.listFiles()); } final private Predicate<Mutation> processor; - public CommitLogTestReplayer(Predicate<Mutation> processor) + public CommitLogTestReplayer(CommitLog log, Predicate<Mutation> processor) { - this(ReplayPosition.NONE, processor); + this(log, ReplayPosition.NONE, processor); } - public CommitLogTestReplayer(ReplayPosition discardedPos, Predicate<Mutation> processor) + public CommitLogTestReplayer(CommitLog log, ReplayPosition discardedPos, Predicate<Mutation> processor) { - super(discardedPos, null, ReplayFilter.create()); + super(log, discardedPos, null, ReplayFilter.create()); this.processor = processor; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java index af85d5d..9de2628 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java @@ -157,7 +157,7 @@ public class CommitLogUpgradeTest } Hasher hasher = new Hasher(); - CommitLogTestReplayer replayer = new CommitLogTestReplayer(hasher); + CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, hasher); File[] files = new File(location).listFiles(new FilenameFilter() { @Override