Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84e40cb6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84e40cb6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84e40cb6 Branch: refs/heads/trunk Commit: 84e40cb6bee66f08c18d5522cd3f95fe474af84d Parents: 5c84fe4 5ee6e7b Author: Marcus Eriksson <[email protected]> Authored: Thu Jun 23 11:28:09 2016 +0200 Committer: Marcus Eriksson <[email protected]> Committed: Thu Jun 23 11:28:09 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 30 ++-- .../repair/RepairMessageVerbHandler.java | 32 ++-- .../cassandra/service/ActiveRepairService.java | 175 +++++++++++++++++-- .../service/ActiveRepairServiceTest.java | 126 +++++++++++-- 5 files changed, 306 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/84e40cb6/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/84e40cb6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/84e40cb6/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index affe18b,edcb4f9..312daed --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@@ -90,27 -90,22 +90,23 @@@ public class RepairMessageVerbHandler i desc.keyspace, desc.columnFamily), message.from, id); return; } - final Collection<Range<Token>> repairingRange = desc.ranges; - Set<SSTableReader> snapshottedSSSTables = cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() ++ + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId); + if (prs.isGlobal) { - public boolean apply(SSTableReader sstable) - { - return sstable != null && - !sstable.metadata.isIndex() && // exclude SSTables from 2i - new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(repairingRange); - } - }, true, false); //ephemeral snapshot, if repair fails, it will be cleaned next startup - if (ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).isGlobal) + prs.maybeSnapshot(cfs.metadata.cfId, desc.parentSessionId); + } + else { - Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, desc.parentSessionId); - if (!Sets.intersection(currentlyRepairing, snapshottedSSSTables).isEmpty()) + cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() { - // clear snapshot that we just created - cfs.clearSnapshot(desc.sessionId.toString()); - logErrorAndSendFailureResponse("Cannot start multiple repair sessions over the same sstables", message.from, id); - return; - } - ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId, snapshottedSSSTables); + public boolean apply(SSTableReader sstable) + { + return sstable != null && + !sstable.metadata.isIndex() && // exclude SSTables from 2i + new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(desc.ranges); + } - }, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup ++ }, true, false); //ephemeral snapshot, if repair fails, it will be cleaned next startup } logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); http://git-wip-us.apache.org/repos/asf/cassandra/blob/84e40cb6/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index 12d0f1e,27c2424..462324c --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -490,6 -557,90 +557,90 @@@ public class ActiveRepairService implem return new Refs<>(references.build()); } + /** + * If we are running a snapshot repair we need to find the 'real' sstables when we start anticompaction + * + * We use the generation of the sstables as identifiers instead of the file name to avoid having to parse out the + * actual filename. + * + * @param cfId + * @param parentSessionId + * @return + */ + private Set<SSTableReader> getSSTablesForSnapshotRepair(UUID cfId, UUID parentSessionId) + { + Set<SSTableReader> activeSSTables = new HashSet<>(); + ColumnFamilyStore cfs = columnFamilyStores.get(cfId); + + Set<Integer> snapshotGenerations = new HashSet<>(); + try (Refs<SSTableReader> snapshottedSSTables = cfs.getSnapshotSSTableReader(parentSessionId.toString())) + { + for (SSTableReader sstable : snapshottedSSTables) + { + snapshotGenerations.add(sstable.descriptor.generation); + } + } + catch (IOException e) + { + throw new RuntimeException(e); + } + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) + if (snapshotGenerations.contains(sstable.descriptor.generation)) + activeSSTables.add(sstable); + return activeSSTables; + } + + public synchronized void maybeSnapshot(UUID cfId, UUID parentSessionId) + { + String snapshotName = parentSessionId.toString(); + if (!columnFamilyStores.get(cfId).snapshotExists(snapshotName)) + { + Set<SSTableReader> snapshottedSSTables = columnFamilyStores.get(cfId).snapshot(snapshotName, new Predicate<SSTableReader>() + { + public boolean apply(SSTableReader sstable) + { + return sstable != null && + (!isIncremental || !sstable.isRepaired()) && + !(sstable.metadata.isIndex()) && // exclude SSTables from 2i + new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges); + } - }, true); ++ }, true, false); + + if (isAlreadyRepairing(cfId, parentSessionId, snapshottedSSTables)) + { + columnFamilyStores.get(cfId).clearSnapshot(parentSessionId.toString()); + logger.error("Cannot start multiple repair sessions over the same sstables"); + throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); + } + addSSTables(cfId, snapshottedSSTables); + marked.add(cfId); + } + } + + + /** + * Compares other repairing sstables *generation* to the ones we just snapshotted + * + * we compare generations since the sstables have different paths due to snapshot names + * + * @param cfId id of the column family store + * @param parentSessionId parent repair session + * @param sstables the newly snapshotted sstables + * @return + */ + private boolean isAlreadyRepairing(UUID cfId, UUID parentSessionId, Collection<SSTableReader> sstables) + { + Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId); + Set<Integer> currentlyRepairingGenerations = new HashSet<>(); + Set<Integer> newRepairingGenerations = new HashSet<>(); + for (SSTableReader sstable : currentlyRepairing) + currentlyRepairingGenerations.add(sstable.descriptor.generation); + for (SSTableReader sstable : sstables) + newRepairingGenerations.add(sstable.descriptor.generation); + + return !Sets.intersection(currentlyRepairingGenerations, newRepairingGenerations).isEmpty(); + } + private Set<SSTableReader> getActiveSSTables(UUID cfId) { Set<String> repairedSSTables = sstableMap.get(cfId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/84e40cb6/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index da067fd,adcd684..2c1a8d2 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@@ -261,6 -263,93 +263,94 @@@ public class ActiveRepairServiceTes refs.release(); } + @Test + public void testAddingMoreSSTables() + { + ColumnFamilyStore store = prepareColumnFamilyStore(); + Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables); + UUID prsId = UUID.randomUUID(); + ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, System.currentTimeMillis(), true); ++ + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); + prs.markSSTablesRepairing(store.metadata.cfId, prsId); + try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId)) + { + Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); + assertEquals(original, retrieved); + } + createSSTables(store, 2); + boolean exception = false; + try + { + UUID newPrsId = UUID.randomUUID(); + ActiveRepairService.instance.registerParentRepairSession(newPrsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, System.currentTimeMillis(), true); + ActiveRepairService.instance.getParentRepairSession(newPrsId).markSSTablesRepairing(store.metadata.cfId, newPrsId); + } + catch (Throwable t) + { + exception = true; + } + assertTrue(exception); + + try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId)) + { + Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); + assertEquals(original, retrieved); + } + } + + @Test + public void testSnapshotAddSSTables() throws ExecutionException, InterruptedException + { + ColumnFamilyStore store = prepareColumnFamilyStore(); + UUID prsId = UUID.randomUUID(); + Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables); + ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true); + ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId, prsId); + + UUID prsId2 = UUID.randomUUID(); + ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true); + createSSTables(store, 2); + ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId, prsId); + try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId)) + { + assertEquals(original, Sets.newHashSet(refs.iterator())); + } + store.forceMajorCompaction(); + // after a major compaction the original sstables will be gone and we will have no sstables to anticompact: + try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId)) + { + assertEquals(0, refs.size()); + } + } + + @Test + public void testSnapshotMultipleRepairs() + { + ColumnFamilyStore store = prepareColumnFamilyStore(); + Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables); + UUID prsId = UUID.randomUUID(); + ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true); + ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId, prsId); + + UUID prsId2 = UUID.randomUUID(); + ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true); + boolean exception = false; + try + { + ActiveRepairService.instance.getParentRepairSession(prsId2).maybeSnapshot(store.metadata.cfId, prsId2); + } + catch (Throwable t) + { + exception = true; + } + assertTrue(exception); + try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId)) + { + assertEquals(original, Sets.newHashSet(refs.iterator())); + } + } + private ColumnFamilyStore prepareColumnFamilyStore() { Keyspace keyspace = Keyspace.open(KEYSPACE5);
