Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 48abc0369 -> 225677872
Add parent repair session id to anticompaction log message Patch by Tommy Stendahl; Reviewed by Paulo Motta for CASSANDRA-12186 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/22567787 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/22567787 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/22567787 Branch: refs/heads/cassandra-3.0 Commit: 2256778726319fb76b6d85c4a47a957116c78147 Parents: 48abc03 Author: tommy stendahl <[email protected]> Authored: Tue Nov 15 14:52:57 2016 +0100 Committer: Paulo Motta <[email protected]> Committed: Thu Dec 15 17:38:20 2016 -0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 21 +++++++++++--------- .../cassandra/service/ActiveRepairService.java | 4 ++-- .../db/compaction/AntiCompactionTest.java | 17 ++++++++++------ 4 files changed, 26 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/22567787/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 63e095d..a40dabd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.11 + * Add parent repair session id to anticompaction log message (CASSANDRA-12186) * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905) * Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829) * Mark MVs as built after successful bootstrap (CASSANDRA-12984) http://git-wip-us.apache.org/repos/asf/cassandra/blob/22567787/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index a77cefb..28140e0 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -456,7 +456,8 @@ public class CompactionManager implements CompactionManagerMBean public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs, final Collection<Range<Token>> ranges, final Refs<SSTableReader> sstables, - final long repairedAt) + final long repairedAt, + final UUID parentRepairSession) { Runnable runnable = new WrappedRunnable() { @Override @@ -475,7 +476,7 @@ public class CompactionManager implements CompactionManagerMBean sstables.release(compactedSSTables); modifier = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); } - performAnticompaction(cfs, ranges, sstables, modifier, repairedAt); + performAnticompaction(cfs, ranges, sstables, modifier, repairedAt, parentRepairSession); } }; @@ -500,6 +501,7 @@ public class CompactionManager implements CompactionManagerMBean * @param cfs * @param ranges Ranges that the repair was carried out on * @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them. + * @param parentRepairSession parent repair session ID * @throws InterruptedException * @throws IOException */ @@ -507,10 +509,11 @@ public class CompactionManager implements CompactionManagerMBean Collection<Range<Token>> ranges, Refs<SSTableReader> validatedForRepair, LifecycleTransaction txn, - long repairedAt) throws InterruptedException, IOException + long repairedAt, + UUID parentRepairSession) throws InterruptedException, IOException { - logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getLiveSSTables()); - logger.trace("Starting anticompaction for ranges {}", ranges); + logger.info("[repair #{}] Starting anticompaction for {}.{} on {}/{} sstables", parentRepairSession, cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables()); + logger.trace("[repair #{}] Starting anticompaction for ranges {}", parentRepairSession, ranges); Set<SSTableReader> sstables = new HashSet<>(validatedForRepair); Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); // we should only notify that repair status changed if it actually did: @@ -538,7 +541,7 @@ public class CompactionManager implements CompactionManagerMBean { if (r.contains(sstableRange)) { - logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r); + logger.info("[repair #{}] SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", parentRepairSession, sstable, r); sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt); sstable.reloadSSTableMetadata(); mutatedRepairStatuses.add(sstable); @@ -550,14 +553,14 @@ public class CompactionManager implements CompactionManagerMBean } else if (sstableRange.intersects(r)) { - logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r); + logger.info("[repair #{}] SSTable {} ({}) will be anticompacted on range {}", parentRepairSession, sstable, sstableRange, r); shouldAnticompact = true; } } if (!shouldAnticompact) { - logger.info("SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", sstable, sstableRange, normalizedRanges); + logger.info("[repair #{}] SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", parentRepairSession, sstable, sstableRange, normalizedRanges); nonAnticompacting.add(sstable); sstableIterator.remove(); } @@ -576,7 +579,7 @@ public class CompactionManager implements CompactionManagerMBean txn.close(); } - logger.info("Completed anticompaction successfully"); + logger.info("[repair #{}] Completed anticompaction successfully", parentRepairSession); } public void performMaximal(final ColumnFamilyStore cfStore, boolean splitOutput) http://git-wip-us.apache.org/repos/asf/cassandra/blob/22567787/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 6f7b1a4..97c5c0a 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -433,7 +433,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai //in addition to other scenarios such as repairs not involving all DCs or hosts if (!prs.isGlobal) { - logger.info("Not a global repair, will not do anticompaction"); + logger.info("[repair #{}] Not a global repair, will not do anticompaction", parentRepairSession); removeParentRepairSession(parentRepairSession); return Futures.immediateFuture(Collections.emptyList()); } @@ -447,7 +447,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai { Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefsForAntiCompaction(columnFamilyStoreEntry.getKey(), parentRepairSession); ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue(); - futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt)); + futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt, parentRepairSession)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/22567787/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index db07eb8..4c25f3a 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.UUID; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -99,7 +100,8 @@ public class AntiCompactionTest if (txn == null) throw new IllegalStateException(); long repairedAt = 1000; - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt); + UUID parentRepairSession = UUID.randomUUID(); + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, parentRepairSession); } assertEquals(2, store.getLiveSSTables().size()); @@ -144,10 +146,11 @@ public class AntiCompactionTest long origSize = s.bytesOnDisk(); Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500))); Collection<SSTableReader> sstables = cfs.getLiveSSTables(); + UUID parentRepairSession = UUID.randomUUID(); try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) { - CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), refs, txn, 12345); + CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), refs, txn, 12345, parentRepairSession); } long sum = 0; long rows = 0; @@ -226,10 +229,11 @@ public class AntiCompactionTest List<Range<Token>> ranges = Arrays.asList(range); long repairedAt = 1000; + UUID parentRepairSession = UUID.randomUUID(); try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) { - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt); + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, parentRepairSession); } /* Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time @@ -274,11 +278,12 @@ public class AntiCompactionTest assertEquals(store.getLiveSSTables().size(), sstables.size()); Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("9999".getBytes())); List<Range<Token>> ranges = Arrays.asList(range); + UUID parentRepairSession = UUID.randomUUID(); try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) { - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1); + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession); } assertThat(store.getLiveSSTables().size(), is(1)); @@ -304,12 +309,12 @@ public class AntiCompactionTest Range<Token> range = new Range<Token>(new BytesToken("-1".getBytes()), new BytesToken("-10".getBytes())); List<Range<Token>> ranges = Arrays.asList(range); - + UUID parentRepairSession = UUID.randomUUID(); try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) { - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1); + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession); } assertThat(store.getLiveSSTables().size(), is(10));
