Repository: cassandra Updated Branches: refs/heads/trunk a7c1729c1 -> 21bb6d2ef
Make sure we set lastCompactedKey properly Patch by marcuse; reviewed by rbranson for CASSANDRA-8463 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/35270996 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/35270996 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/35270996 Branch: refs/heads/trunk Commit: 3527099693607a550d64b0b9e0351d57fd2df45d Parents: c8b4d95 Author: Marcus Eriksson <[email protected]> Authored: Tue Dec 16 11:07:52 2014 +0100 Committer: Marcus Eriksson <[email protected]> Committed: Thu Dec 18 07:33:24 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../compaction/AbstractCompactionStrategy.java | 8 +++++ .../compaction/LeveledCompactionStrategy.java | 6 ++++ .../compaction/WrappingCompactionStrategy.java | 38 ++++++++++++++++++-- 4 files changed, 50 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/35270996/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e0652af..edd1026 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.3 + * Make sure we set lastCompactedKey correctly (CASSANDRA-8463) * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507) * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370) * Make sstablescrub check leveled manifest again (CASSANDRA-8432) http://git-wip-us.apache.org/repos/asf/cassandra/blob/35270996/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 337657d..10abd01 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -300,6 +300,14 @@ public abstract class AbstractCompactionStrategy return getClass().getSimpleName(); } + public synchronized void replaceSSTables(Collection<SSTableReader> removed, Collection<SSTableReader> added) + { + for (SSTableReader remove : removed) + removeSSTable(remove); + for (SSTableReader add : added) + addSSTable(add); + } + public abstract void addSSTable(SSTableReader added); public abstract void removeSSTable(SSTableReader sstable); http://git-wip-us.apache.org/repos/asf/cassandra/blob/35270996/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index dbb9a13..f9e5d16 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -207,6 +207,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy } @Override + public void replaceSSTables(Collection<SSTableReader> removed, Collection<SSTableReader> added) + { + manifest.replace(removed, added); + } + + @Override public void addSSTable(SSTableReader added) { manifest.add(added); http://git-wip-us.apache.org/repos/asf/cassandra/blob/35270996/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java index 84ef97f..c88bdf0 100644 --- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java @@ -20,7 +20,9 @@ package org.apache.cassandra.db.compaction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import org.slf4j.Logger; @@ -213,6 +215,12 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy } @Override + public void replaceSSTables(Collection<SSTableReader> removed, Collection<SSTableReader> added) + { + throw new UnsupportedOperationException("Can't replace sstables in the wrapping compaction strategy"); + } + + @Override public void addSSTable(SSTableReader added) { throw new UnsupportedOperationException("Can't add sstables to the wrapping compaction strategy"); @@ -237,18 +245,42 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy else if (notification instanceof SSTableListChangedNotification) { SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification; + Set<SSTableReader> repairedRemoved = new HashSet<>(); + Set<SSTableReader> repairedAdded = new HashSet<>(); + Set<SSTableReader> unrepairedRemoved = new HashSet<>(); + Set<SSTableReader> unrepairedAdded = new HashSet<>(); + for (SSTableReader sstable : listChangedNotification.removed) { if (sstable.isRepaired()) - repaired.removeSSTable(sstable); + repairedRemoved.add(sstable); else - unrepaired.removeSSTable(sstable); + unrepairedRemoved.add(sstable); } for (SSTableReader sstable : listChangedNotification.added) { if (sstable.isRepaired()) - repaired.addSSTable(sstable); + repairedAdded.add(sstable); else + unrepairedAdded.add(sstable); + } + if (!repairedRemoved.isEmpty()) + { + repaired.replaceSSTables(repairedRemoved, repairedAdded); + } + else + { + for (SSTableReader sstable : repairedAdded) + repaired.addSSTable(sstable); + } + + if (!unrepairedRemoved.isEmpty()) + { + unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded); + } + else + { + for (SSTableReader sstable : unrepairedAdded) unrepaired.addSSTable(sstable); } }
