Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 1879d9928 -> 60eab4e45
Fix truncate to always call flush on table Patch by Jeremiah Jordan; reviewed by tjake for (CASSANDRA-7511) Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/60eab4e4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/60eab4e4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/60eab4e4 Branch: refs/heads/cassandra-2.0 Commit: 60eab4e45e18d6b08350187acf56deed9654fda7 Parents: 1879d99 Author: Jake Luciani <j...@apache.org> Authored: Fri Aug 1 10:30:48 2014 -0400 Committer: Jake Luciani <j...@apache.org> Committed: Fri Aug 1 10:30:48 2014 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/config/DatabaseDescriptor.java | 5 +++ .../apache/cassandra/db/ColumnFamilyStore.java | 29 +++--------------- .../org/apache/cassandra/db/DataTracker.java | 18 ----------- test/conf/cassandra.yaml | 1 + .../org/apache/cassandra/db/CommitLogTest.java | 32 ++++++++++++++++++++ 6 files changed, 44 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/60eab4e4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1fcb556..33bab82 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.10 + * Fix truncate to always flush (CASSANDRA-7511) * Remove shuffle and taketoken (CASSANDRA-7601) * Switch liveRatio-related log messages to DEBUG (CASSANDRA-7467) * (cqlsh) Add tab-completion for CREATE/DROP USER IF [NOT] EXISTS (CASSANDRA-7611) http://git-wip-us.apache.org/repos/asf/cassandra/blob/60eab4e4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index bf0307b..d4c1f26 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1131,6 +1131,11 @@ public class DatabaseDescriptor return conf.auto_snapshot; } + @VisibleForTesting + public static void setAutoSnapshot(boolean autoSnapshot) { + conf.auto_snapshot = autoSnapshot; + } + public static boolean isAutoBootstrap() { return conf.auto_bootstrap; http://git-wip-us.apache.org/repos/asf/cassandra/blob/60eab4e4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 2824924..a3c080a 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2002,31 +2002,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // position in the System keyspace. logger.debug("truncating {}", name); - if (DatabaseDescriptor.isAutoSnapshot()) - { - // flush the CF being truncated before forcing the new segment - forceBlockingFlush(); - - // sleep a little to make sure that our truncatedAt comes after any sstable - // that was part of the flushed we forced; otherwise on a tie, it won't get deleted. - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); - } + // flush the CF being truncated before forcing the new segment + forceBlockingFlush(); - // nuke the memtable data w/o writing to disk first - Keyspace.switchLock.writeLock().lock(); - try - { - for (ColumnFamilyStore cfs : concatWithIndexes()) - { - Memtable mt = cfs.getMemtableThreadSafe(); - if (!mt.isClean()) - mt.cfs.data.renewMemtable(); - } - } - finally - { - Keyspace.switchLock.writeLock().unlock(); - } + // sleep a little to make sure that our truncatedAt comes after any sstable + // that was part of the flushed we forced; otherwise on a tie, it won't get deleted. + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); Runnable truncateRunnable = new Runnable() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/60eab4e4/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index a9eef98..a0f880a 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -123,24 +123,6 @@ public class DataTracker return toFlushMemtable; } - /** - * Renew the current memtable without putting the old one for a flush. - * Used when we flush but a memtable is clean (in which case we must - * change it because it was frozen). - */ - public void renewMemtable() - { - Memtable newMemtable = new Memtable(cfstore, view.get().memtable); - View currentView, newView; - do - { - currentView = view.get(); - newView = currentView.renewMemtable(newMemtable); - } - while (!view.compareAndSet(currentView, newView)); - notifyRenewed(currentView.memtable); - } - public void replaceFlushed(Memtable memtable, SSTableReader sstable) { // sstable may be null if we flushed batchlog and nothing needed to be retained http://git-wip-us.apache.org/repos/asf/cassandra/blob/60eab4e4/test/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index d92eba6..3bb29bb 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -6,6 +6,7 @@ cluster_name: Test Cluster in_memory_compaction_limit_in_mb: 1 commitlog_sync: batch commitlog_sync_batch_window_in_ms: 1.0 +commitlog_segment_size_in_mb: 1 partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner listen_address: 127.0.0.1 storage_port: 7010 http://git-wip-us.apache.org/repos/asf/cassandra/blob/60eab4e4/test/unit/org/apache/cassandra/db/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java index 036ce15..a7df871 100644 --- a/test/unit/org/apache/cassandra/db/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java @@ -22,6 +22,7 @@ package org.apache.cassandra.db; import java.io.*; import java.nio.ByteBuffer; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -35,8 +36,10 @@ import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogDescriptor; +import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; @@ -257,4 +260,33 @@ public class CommitLogTest extends SchemaLoader } } + @Test + public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException + { + CommitLog.instance.resetUnsafe(); + boolean prev = DatabaseDescriptor.isAutoSnapshot(); + DatabaseDescriptor.setAutoSnapshot(false); + ColumnFamilyStore cfs1 = Keyspace.open("Keyspace1").getColumnFamilyStore("Standard1"); + ColumnFamilyStore cfs2 = Keyspace.open("Keyspace1").getColumnFamilyStore("Standard2"); + + final RowMutation rm1 = new RowMutation("Keyspace1", bytes("k")); + rm1.add("Standard1", bytes("c1"), ByteBuffer.allocate(100), 0); + rm1.apply(); + cfs1.truncateBlocking(); + DatabaseDescriptor.setAutoSnapshot(prev); + final RowMutation rm2 = new RowMutation("Keyspace1", bytes("k")); + rm2.add("Standard2", bytes("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4), 0); + + for (int i = 0 ; i < 5 ; i++) + CommitLog.instance.add(rm2); + + Assert.assertEquals(2, CommitLog.instance.activeSegments()); + ReplayPosition position = CommitLog.instance.getContext().get(); + for (Keyspace ks : Keyspace.system()) + for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores()) + CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position); + CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position); + Assert.assertEquals(1, CommitLog.instance.activeSegments()); + } + }