Updated Branches: refs/heads/cassandra-1.1 861f1f3a9 -> 08848e795 refs/heads/trunk ca104bac3 -> 4d7e70356
avoid blocking additional writes during flush patch by jbellis; reviewed by slebresnse and tested by brandonwilliams for CASSANDRA-1991 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/08848e79 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/08848e79 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/08848e79 Branch: refs/heads/cassandra-1.1 Commit: 08848e7956f5fd08525a08498205637b2652f2a7 Parents: 67ed39f Author: Jonathan Ellis <jbel...@apache.org> Authored: Wed May 9 14:51:24 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Wed May 9 14:52:18 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 12 +++--- src/java/org/apache/cassandra/db/Memtable.java | 12 +++--- .../apache/cassandra/db/commitlog/CommitLog.java | 19 +++-------- .../db/compaction/LeveledCompactionStrategy.java | 25 +++++++++++++-- .../cassandra/db/compaction/LeveledManifest.java | 9 +++++ .../org/apache/cassandra/db/CommitLogTest.java | 6 ++-- 7 files changed, 53 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/08848e79/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f17ffd1..9246433 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 1.1.1-dev + * avoid blocking additional writes during flush when the commitlog + gets behind temporarily (CASSANDRA-1991) * enable caching on index CFs based on data CF cache setting (CASSANDRA-4197) * warn on invalid replication strategy creation options (CASSANDRA-4046) * remove [Freeable]Memory finalizers (CASSANDRA-4222) http://git-wip-us.apache.org/repos/asf/cassandra/blob/08848e79/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 659be73..9dcf1ef 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -31,6 +31,7 @@ import java.util.regex.Pattern; import javax.management.*; import com.google.common.collect.*; +import com.google.common.util.concurrent.Futures; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -609,8 +610,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } assert getMemtableThreadSafe() == oldMemtable; - final ReplayPosition ctx = writeCommitLog ? CommitLog.instance.getContext() : ReplayPosition.NONE; - logger.debug("flush position is {}", ctx); + final Future<ReplayPosition> ctx = writeCommitLog ? CommitLog.instance.getContext() : Futures.immediateFuture(ReplayPosition.NONE); // submit the memtable for any indexed sub-cfses, and our own. final List<ColumnFamilyStore> icc = new ArrayList<ColumnFamilyStore>(); @@ -642,7 +642,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // while keeping the wait-for-flush (future.get) out of anything latency-sensitive. return postFlushExecutor.submit(new WrappedRunnable() { - public void runMayThrow() throws InterruptedException, IOException + public void runMayThrow() throws InterruptedException, IOException, ExecutionException { latch.await(); @@ -662,7 +662,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { // if we're not writing to the commit log, we are replaying the log, so marking // the log header with "you can discard anything written before the context" is not valid - CommitLog.instance.discardCompletedSegments(metadata.cfId, ctx); + CommitLog.instance.discardCompletedSegments(metadata.cfId, ctx.get()); } } }); @@ -1710,13 +1710,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (ksm.durableWrites) { CommitLog.instance.forceNewSegment(); - ReplayPosition position = CommitLog.instance.getContext(); + Future<ReplayPosition> position = CommitLog.instance.getContext(); // now flush everyone else. re-flushing ourselves is not necessary, but harmless for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) cfs.forceFlush(); waitForActiveFlushes(); // if everything was clean, flush won't have called discard - CommitLog.instance.discardCompletedSegments(metadata.cfId, position); + CommitLog.instance.discardCompletedSegments(metadata.cfId, position.get()); } // sleep a little to make sure that our truncatedAt comes after any sstable http://git-wip-us.apache.org/repos/asf/cassandra/blob/08848e79/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 1eddf15..f1dcc56 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -261,7 +261,7 @@ public class Memtable } - private SSTableReader writeSortedContents(ReplayPosition context) throws IOException + private SSTableReader writeSortedContents(Future<ReplayPosition> context) throws IOException, ExecutionException, InterruptedException { logger.info("Writing " + this); @@ -278,7 +278,7 @@ public class Memtable * 1.2); // bloom filter and row index overhead SSTableReader ssTable; // errors when creating the writer that may leave empty temp files. - SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size(), estimatedSize, context); + SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size(), estimatedSize, context.get()); try { // (we can't clear out the map as-we-go to free up memory, @@ -304,16 +304,16 @@ public class Memtable writer.abort(); throw FBUtilities.unchecked(e); } - logger.info(String.format("Completed flushing %s (%d bytes)", - ssTable.getFilename(), new File(ssTable.getFilename()).length())); + logger.info(String.format("Completed flushing %s (%d bytes) for commitlog position %s", + ssTable.getFilename(), new File(ssTable.getFilename()).length(), context.get())); return ssTable; } - public void flushAndSignal(final CountDownLatch latch, ExecutorService writer, final ReplayPosition context) + public void flushAndSignal(final CountDownLatch latch, ExecutorService writer, final Future<ReplayPosition> context) { writer.execute(new WrappedRunnable() { - public void runMayThrow() throws IOException + public void runMayThrow() throws Exception { SSTableReader sstable = writeSortedContents(context); cfs.replaceFlushed(Memtable.this, sstable); http://git-wip-us.apache.org/repos/asf/cassandra/blob/08848e79/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 3c34772..a490569 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.db.*; @@ -159,9 +160,10 @@ public class CommitLog implements CommitLogMBean } /** - * @return the current ReplayPosition of the current segment file + * @return a Future representing a ReplayPosition such that when it is ready, + * all commitlog tasks enqueued prior to the getContext call will be complete (i.e., appended to the log) */ - public ReplayPosition getContext() + public Future<ReplayPosition> getContext() { Callable<ReplayPosition> task = new Callable<ReplayPosition>() { @@ -170,18 +172,7 @@ public class CommitLog implements CommitLogMBean return activeSegment.getContext(); } }; - try - { - return executor.submit(task).get(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - catch (ExecutionException e) - { - throw new RuntimeException(e); - } + return executor.submit(task); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/08848e79/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 858a2bc..5403aa2 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -172,21 +172,40 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem { Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create(); for (SSTableReader sstable : sstables) - byLevel.get(manifest.levelOf(sstable)).add(sstable); + { + int level = manifest.levelOf(sstable); + assert level >= 0; + byLevel.get(level).add(sstable); + } List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size()); for (Integer level : byLevel.keySet()) { if (level == 0) { - // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each + // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each. for (SSTableReader sstable : byLevel.get(level)) scanners.add(sstable.getDirectScanner(range)); } else { // Create a LeveledScanner that only opens one sstable at a time, in sorted order - scanners.add(new LeveledScanner(byLevel.get(level), range)); + ArrayList<SSTableReader> sstables1 = new ArrayList<SSTableReader>(byLevel.get(level)); + scanners.add(new LeveledScanner(sstables1, range)); + + Collections.sort(sstables1, SSTable.sstableComparator); + SSTableReader previous = null; + for (SSTableReader sstable : sstables1) + { + assert previous == null || sstable.first.compareTo(previous.last) > 0 : String.format("%s >= %s in %s and %s for %s in %s", + previous.last, + sstable.first, + previous, + sstable, + sstable.getColumnFamilyName(), + manifest.getLevel(level)); + previous = sstable; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/08848e79/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 69ab492..c3517e1 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -32,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -188,6 +189,14 @@ public class LeveledManifest for (SSTableReader ssTableReader : added) add(ssTableReader, newLevel); + DecoratedKey last = null; + Collections.sort(generations[newLevel], SSTable.sstableComparator); + for (SSTableReader sstable : generations[newLevel]) + { + assert last == null || sstable.first.compareTo(last) > 0; + last = sstable.last; + } + serialize(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/08848e79/test/unit/org/apache/cassandra/db/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java index e4b04a4..4e48d73 100644 --- a/test/unit/org/apache/cassandra/db/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java @@ -112,7 +112,7 @@ public class CommitLogTest extends SchemaLoader assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments(); int cfid2 = rm2.getColumnFamilyIds().iterator().next(); - CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext()); + CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext().get()); // Assert we still have both our segment assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments(); @@ -134,7 +134,7 @@ public class CommitLogTest extends SchemaLoader // "Flush": this won't delete anything int cfid1 = rm.getColumnFamilyIds().iterator().next(); - CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext()); + CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext().get()); assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); @@ -152,7 +152,7 @@ public class CommitLogTest extends SchemaLoader // didn't write anything on cf1 since last flush (and we flush cf2) int cfid2 = rm2.getColumnFamilyIds().iterator().next(); - CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext()); + CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext().get()); // Assert we still have both our segment assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();