Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 af509ec9a -> 450091bfb
Fix the way we replace sstables after anticompaction Patch by marcuse; reviewed by yukim for CASSANDRA-10831 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d51b65e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d51b65e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d51b65e Branch: refs/heads/cassandra-2.2 Commit: 0d51b65e32bd2c6343d7a07314e0c88256c73bf0 Parents: f1b9e9a Author: Marcus Eriksson <marc...@apache.org> Authored: Wed Dec 9 13:09:33 2015 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Dec 29 13:45:32 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/compaction/CompactionManager.java | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d51b65e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 41bf6bc..9997e1e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.13 + * Fix the way we replace sstables after anticompaction (CASSANDRA-10831) * cqlsh fails to decode utf-8 characters for text typed columns (CASSANDRA-10875) * Log error when stream session fails (CASSANDRA-9294) * Fix bugs in commit log archiving startup behavior (CASSANDRA-10593) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d51b65e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 9bddaf5..30b8475 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -503,9 +503,9 @@ public class CompactionManager implements CompactionManagerMBean sstableIterator.remove(); } } + validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses)); cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses); cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses)); - validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses)); if (!sstables.isEmpty()) doAntiCompaction(cfs, ranges, sstables, repairedAt); } @@ -1085,6 +1085,7 @@ public class CompactionManager implements CompactionManagerMBean int unrepairedKeyCount = 0; logger.info("Performing anticompaction on {} sstables", repairedSSTables.size()); // iterate over sstables to check if the repaired / unrepaired ranges intersect them. + Set<SSTableReader> successfullyAntiCompactedSSTables = new HashSet<>(); for (SSTableReader sstable : repairedSSTables) { // check that compaction hasn't stolen any sstables used in previous repair sessions @@ -1138,7 +1139,7 @@ public class CompactionManager implements CompactionManagerMBean } anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt)); anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE)); - cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION); + successfullyAntiCompactedSSTables.add(sstable); } catch (Throwable e) { @@ -1148,6 +1149,7 @@ public class CompactionManager implements CompactionManagerMBean unRepairedSSTableWriter.abort(); } } + cfs.getDataTracker().markCompactedSSTablesReplaced(successfullyAntiCompactedSSTables, anticompactedSSTables, OperationType.ANTICOMPACTION); String format = "Repaired {} keys of {} for {}/{}"; logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName()); String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";