Merge branch 'cassandra-3.0' into trunk

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84e40cb6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84e40cb6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84e40cb6

Branch: refs/heads/trunk
Commit: 84e40cb6bee66f08c18d5522cd3f95fe474af84d
Parents: 5c84fe4 5ee6e7b
Author: Marcus Eriksson <[email protected]>
Authored: Thu Jun 23 11:28:09 2016 +0200
Committer: Marcus Eriksson <[email protected]>
Committed: Thu Jun 23 11:28:09 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  30 ++--
 .../repair/RepairMessageVerbHandler.java        |  32 ++--
 .../cassandra/service/ActiveRepairService.java  | 175 +++++++++++++++++--
 .../service/ActiveRepairServiceTest.java        | 126 +++++++++++--
 5 files changed, 306 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/84e40cb6/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84e40cb6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84e40cb6/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index affe18b,edcb4f9..312daed
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@@ -90,27 -90,22 +90,23 @@@ public class RepairMessageVerbHandler i
                                                                       
desc.keyspace, desc.columnFamily), message.from, id);
                          return;
                      }
-                     final Collection<Range<Token>> repairingRange = 
desc.ranges;
-                     Set<SSTableReader> snapshottedSSSTables = 
cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
++
+                     ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
+                     if (prs.isGlobal)
                      {
-                         public boolean apply(SSTableReader sstable)
-                         {
-                             return sstable != null &&
-                                    !sstable.metadata.isIndex() && // exclude 
SSTables from 2i
-                                    new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(repairingRange);
-                         }
-                     }, true, false); //ephemeral snapshot, if repair fails, 
it will be cleaned next startup
-                     if 
(ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).isGlobal)
+                         prs.maybeSnapshot(cfs.metadata.cfId, 
desc.parentSessionId);
+                     }
+                     else
                      {
-                         Set<SSTableReader> currentlyRepairing = 
ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, 
desc.parentSessionId);
-                         if (!Sets.intersection(currentlyRepairing, 
snapshottedSSSTables).isEmpty())
+                         cfs.snapshot(desc.sessionId.toString(), new 
Predicate<SSTableReader>()
                          {
-                             // clear snapshot that we just created
-                             cfs.clearSnapshot(desc.sessionId.toString());
-                             logErrorAndSendFailureResponse("Cannot start 
multiple repair sessions over the same sstables", message.from, id);
-                             return;
-                         }
-                         
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId,
 snapshottedSSSTables);
+                             public boolean apply(SSTableReader sstable)
+                             {
+                                 return sstable != null &&
+                                        !sstable.metadata.isIndex() && // 
exclude SSTables from 2i
+                                        new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(desc.ranges);
+                             }
 -                        }, true); //ephemeral snapshot, if repair fails, it 
will be cleaned next startup
++                        }, true, false); //ephemeral snapshot, if repair 
fails, it will be cleaned next startup
                      }
                      logger.debug("Enqueuing response to snapshot request {} 
to {}", desc.sessionId, message.from);
                      MessagingService.instance().sendReply(new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84e40cb6/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 12d0f1e,27c2424..462324c
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -490,6 -557,90 +557,90 @@@ public class ActiveRepairService implem
              return new Refs<>(references.build());
          }
  
+         /**
+          * If we are running a snapshot repair we need to find the 'real' 
sstables when we start anticompaction
+          *
+          * We use the generation of the sstables as identifiers instead of 
the file name to avoid having to parse out the
+          * actual filename.
+          *
+          * @param cfId
+          * @param parentSessionId
+          * @return
+          */
+         private Set<SSTableReader> getSSTablesForSnapshotRepair(UUID cfId, 
UUID parentSessionId)
+         {
+             Set<SSTableReader> activeSSTables = new HashSet<>();
+             ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
+ 
+             Set<Integer> snapshotGenerations = new HashSet<>();
+             try (Refs<SSTableReader> snapshottedSSTables = 
cfs.getSnapshotSSTableReader(parentSessionId.toString()))
+             {
+                 for (SSTableReader sstable : snapshottedSSTables)
+                 {
+                     snapshotGenerations.add(sstable.descriptor.generation);
+                 }
+             }
+             catch (IOException e)
+             {
+                 throw new RuntimeException(e);
+             }
+             for (SSTableReader sstable : 
cfs.getSSTables(SSTableSet.CANONICAL))
+                 if 
(snapshotGenerations.contains(sstable.descriptor.generation))
+                     activeSSTables.add(sstable);
+             return activeSSTables;
+         }
+ 
+         public synchronized void maybeSnapshot(UUID cfId, UUID 
parentSessionId)
+         {
+             String snapshotName = parentSessionId.toString();
+             if (!columnFamilyStores.get(cfId).snapshotExists(snapshotName))
+             {
+                 Set<SSTableReader> snapshottedSSTables = 
columnFamilyStores.get(cfId).snapshot(snapshotName, new 
Predicate<SSTableReader>()
+                 {
+                     public boolean apply(SSTableReader sstable)
+                     {
+                         return sstable != null &&
+                                (!isIncremental || !sstable.isRepaired()) &&
+                                !(sstable.metadata.isIndex()) && // exclude 
SSTables from 2i
+                                new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(ranges);
+                     }
 -                }, true);
++                }, true, false);
+ 
+                 if (isAlreadyRepairing(cfId, parentSessionId, 
snapshottedSSTables))
+                 {
+                     
columnFamilyStores.get(cfId).clearSnapshot(parentSessionId.toString());
+                     logger.error("Cannot start multiple repair sessions over 
the same sstables");
+                     throw new RuntimeException("Cannot start multiple repair 
sessions over the same sstables");
+                 }
+                 addSSTables(cfId, snapshottedSSTables);
+                 marked.add(cfId);
+             }
+         }
+ 
+ 
+         /**
+          * Compares other repairing sstables *generation* to the ones we just 
snapshotted
+          *
+          * we compare generations since the sstables have different paths due 
to snapshot names
+          *
+          * @param cfId id of the column family store
+          * @param parentSessionId parent repair session
+          * @param sstables the newly snapshotted sstables
+          * @return
+          */
+         private boolean isAlreadyRepairing(UUID cfId, UUID parentSessionId, 
Collection<SSTableReader> sstables)
+         {
+             Set<SSTableReader> currentlyRepairing = 
ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId);
+             Set<Integer> currentlyRepairingGenerations = new HashSet<>();
+             Set<Integer> newRepairingGenerations = new HashSet<>();
+             for (SSTableReader sstable : currentlyRepairing)
+                 
currentlyRepairingGenerations.add(sstable.descriptor.generation);
+             for (SSTableReader sstable : sstables)
+                 newRepairingGenerations.add(sstable.descriptor.generation);
+ 
+             return !Sets.intersection(currentlyRepairingGenerations, 
newRepairingGenerations).isEmpty();
+         }
+ 
          private Set<SSTableReader> getActiveSSTables(UUID cfId)
          {
              Set<String> repairedSSTables = sstableMap.get(cfId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84e40cb6/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index da067fd,adcd684..2c1a8d2
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@@ -261,6 -263,93 +263,94 @@@ public class ActiveRepairServiceTes
          refs.release();
      }
  
+     @Test
+     public void testAddingMoreSSTables()
+     {
+         ColumnFamilyStore store = prepareColumnFamilyStore();
+         Set<SSTableReader> original = 
Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> 
!s.isRepaired())).sstables);
+         UUID prsId = UUID.randomUUID();
+         ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, 
true, System.currentTimeMillis(), true);
++
+         ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(prsId);
+         prs.markSSTablesRepairing(store.metadata.cfId, prsId);
+         try (Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+         {
+             Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+             assertEquals(original, retrieved);
+         }
+         createSSTables(store, 2);
+         boolean exception = false;
+         try
+         {
+             UUID newPrsId = UUID.randomUUID();
+             
ActiveRepairService.instance.registerParentRepairSession(newPrsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, 
true, System.currentTimeMillis(), true);
+             
ActiveRepairService.instance.getParentRepairSession(newPrsId).markSSTablesRepairing(store.metadata.cfId,
 newPrsId);
+         }
+         catch (Throwable t)
+         {
+             exception = true;
+         }
+         assertTrue(exception);
+ 
+         try (Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+         {
+             Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+             assertEquals(original, retrieved);
+         }
+     }
+ 
+     @Test
+     public void testSnapshotAddSSTables() throws ExecutionException, 
InterruptedException
+     {
+         ColumnFamilyStore store = prepareColumnFamilyStore();
+         UUID prsId = UUID.randomUUID();
+         Set<SSTableReader> original = 
Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> 
!s.isRepaired())).sstables);
+         ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), 
Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), 
store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), 
true);
+         
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId,
 prsId);
+ 
+         UUID prsId2 = UUID.randomUUID();
+         ActiveRepairService.instance.registerParentRepairSession(prsId2, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), 
Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), 
store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), 
true);
+         createSSTables(store, 2);
+         
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId,
 prsId);
+         try (Refs<SSTableReader> refs = 
ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId,
 prsId))
+         {
+             assertEquals(original, Sets.newHashSet(refs.iterator()));
+         }
+         store.forceMajorCompaction();
+         // after a major compaction the original sstables will be gone and we 
will have no sstables to anticompact:
+         try (Refs<SSTableReader> refs = 
ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId,
 prsId))
+         {
+             assertEquals(0, refs.size());
+         }
+     }
+ 
+     @Test
+     public void testSnapshotMultipleRepairs()
+     {
+         ColumnFamilyStore store = prepareColumnFamilyStore();
+         Set<SSTableReader> original = 
Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> 
!s.isRepaired())).sstables);
+         UUID prsId = UUID.randomUUID();
+         ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), 
Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), 
store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), 
true);
+         
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId,
 prsId);
+ 
+         UUID prsId2 = UUID.randomUUID();
+         ActiveRepairService.instance.registerParentRepairSession(prsId2, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), 
Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), 
store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), 
true);
+         boolean exception = false;
+         try
+         {
+             
ActiveRepairService.instance.getParentRepairSession(prsId2).maybeSnapshot(store.metadata.cfId,
 prsId2);
+         }
+         catch (Throwable t)
+         {
+             exception = true;
+         }
+         assertTrue(exception);
+         try (Refs<SSTableReader> refs = 
ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId,
 prsId))
+         {
+             assertEquals(original, Sets.newHashSet(refs.iterator()));
+         }
+     }
+ 
      private ColumnFamilyStore prepareColumnFamilyStore()
      {
          Keyspace keyspace = Keyspace.open(KEYSPACE5);

Reply via email to