Merge branch 'cassandra-2.1' into trunk

Conflicts:
        src/java/org/apache/cassandra/db/compaction/CompactionManager.java
        src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
        src/java/org/apache/cassandra/service/ActiveRepairService.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d65d264
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d65d264
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d65d264

Branch: refs/heads/trunk
Commit: 6d65d264eed638e86d19c102a2095e3e42f9deb4
Parents: b66a547 6d9d175
Author: Marcus Eriksson <marc...@apache.org>
Authored: Wed Jan 21 10:24:02 2015 +0100
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Wed Jan 21 10:24:02 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  42 ++++-
 .../repair/RepairMessageVerbHandler.java        | 157 ++++++++++---------
 .../cassandra/service/ActiveRepairService.java  |  42 +++--
 .../cassandra/streaming/StreamSession.java      |   1 -
 5 files changed, 151 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d65d264/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b601bb9,ea2ecc0..6d60699
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,58 -1,5 +1,59 @@@
 +3.0
 + * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560)
 + * Support direct buffer decompression for reads (CASSANDRA-8464)
 + * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039)
 + * Add role based access control (CASSANDRA-7653)
 + * Group sstables for anticompaction correctly (CASSANDRA-8578)
 + * Add ReadFailureException to native protocol, respond
 +   immediately when replicas encounter errors while handling
 +   a read request (CASSANDRA-7886)
 + * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308)
 + * Allow mixing token and partition key restrictions (CASSANDRA-7016)
 + * Support index key/value entries on map collections (CASSANDRA-8473)
 + * Modernize schema tables (CASSANDRA-8261)
 + * Support for user-defined aggregation functions (CASSANDRA-8053)
 + * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
 + * Refactor SelectStatement, return IN results in natural order instead
 +   of IN value list order (CASSANDRA-7981)
 + * Support UDTs, tuples, and collections in user-defined
 +   functions (CASSANDRA-7563)
 + * Fix aggregate fn results on empty selection, result column name,
 +   and cqlsh parsing (CASSANDRA-8229)
 + * Mark sstables as repaired after full repair (CASSANDRA-7586)
 + * Extend Descriptor to include a format value and refactor reader/writer
 +   APIs (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup 
(CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 
7929,
 +   7924, 7812, 8063, 7813, 7708)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * Improve concurrency of repair (CASSANDRA-6455, 8208)
 +
 +
  2.1.3
+  * Don't allow starting multiple inc repairs on the same sstables 
(CASSANDRA-8316)
   * Invalidate prepared BATCH statements when related tables
     or keyspaces are dropped (CASSANDRA-8652)
   * Fix missing results in secondary index queries on collections

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d65d264/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 2bba863,02f5e81..1e06f5e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -391,9 -414,9 +416,9 @@@ public class CompactionManager implemen
      public void performAnticompaction(ColumnFamilyStore cfs,
                                        Collection<Range<Token>> ranges,
                                        Collection<SSTableReader> 
validatedForRepair,
 -                                      long repairedAt) throws 
InterruptedException, ExecutionException, IOException
 +                                      long repairedAt) throws 
InterruptedException, IOException
      {
-         logger.info("Starting anticompaction for {}/{}", 
cfs.keyspace.getName(), cfs.getColumnFamilyName());
+         logger.info("Starting anticompaction for {}.{} on {}/{} sstables", 
cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), 
cfs.getSSTables().size());
          logger.debug("Starting anticompaction for ranges {}", ranges);
          Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
          Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
@@@ -928,21 -921,12 +953,28 @@@
              {
                  // flush first so everyone is validating data that is as 
similar as possible
                  
StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
 -                // we don't mark validating sstables as compacting in 
DataTracker, so we have to mark them referenced
 -                // instead so they won't be cleaned up if they do get 
compacted during the validation
 -                if (validator.desc.parentSessionId == null || 
ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId)
 == null)
 -                    sstables = cfs.markCurrentSSTablesReferenced();
 -                else
 -                    sstables = 
ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
 +                ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
 +                Set<SSTableReader> sstablesToValidate = new HashSet<>();
 +                for (SSTableReader sstable : cfs.getSSTables())
 +                {
 +                    if (new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(Arrays.asList(validator.desc.range)))
 +                    {
 +                        if (!prs.isIncremental || !sstable.isRepaired())
 +                        {
 +                            sstablesToValidate.add(sstable);
 +                        }
 +                    }
 +                }
++                Set<SSTableReader> currentlyRepairing = 
ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, 
validator.desc.parentSessionId);
++
++                if (!Sets.intersection(currentlyRepairing, 
sstablesToValidate).isEmpty())
++                {
++                    logger.error("Cannot start multiple repair sessions over 
the same sstables");
++                    throw new RuntimeException("Cannot start multiple repair 
sessions over the same sstables");
++                }
 +                prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
 +
 +                sstables = 
prs.getAndReferenceSSTablesInRange(cfs.metadata.cfId, validator.desc.range);
  
                  if (validator.gcBefore > 0)
                      gcBefore = validator.gcBefore;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d65d264/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 1880e8e,c7cf4c8..be685cd
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@@ -26,11 -24,6 +26,12 @@@ import java.util.UUID
  import java.util.concurrent.Future;
  
  import com.google.common.base.Predicate;
++import com.google.common.collect.Sets;
 +
 +import org.apache.cassandra.dht.Bounds;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -60,87 -57,88 +61,103 @@@ public class RepairMessageVerbHandler i
      {
          // TODO add cancel/interrupt message
          RepairJobDesc desc = message.payload.desc;
-         switch (message.payload.messageType)
+         try
          {
-             case PREPARE_MESSAGE:
-                 PrepareMessage prepareMessage = (PrepareMessage) 
message.payload;
-                 logger.debug("Preparing, {}", prepareMessage);
-                 List<ColumnFamilyStore> columnFamilyStores = new 
ArrayList<>(prepareMessage.cfIds.size());
-                 for (UUID cfId : prepareMessage.cfIds)
-                 {
-                     Pair<String, String> kscf = Schema.instance.getCF(cfId);
-                     ColumnFamilyStore columnFamilyStore = 
Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-                     columnFamilyStores.add(columnFamilyStore);
-                 }
-                 
ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession,
-                                                                          
columnFamilyStores,
-                                                                          
prepareMessage.ranges,
-                                                                          
prepareMessage.isIncremental);
-                 MessagingService.instance().sendReply(new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
-                 break;
+             switch (message.payload.messageType)
+             {
+                 case PREPARE_MESSAGE:
+                     PrepareMessage prepareMessage = (PrepareMessage) 
message.payload;
++                    logger.debug("Preparing, {}", prepareMessage);
+                     List<ColumnFamilyStore> columnFamilyStores = new 
ArrayList<>(prepareMessage.cfIds.size());
+                     for (UUID cfId : prepareMessage.cfIds)
+                     {
+                         Pair<String, String> kscf = 
Schema.instance.getCF(cfId);
+                         ColumnFamilyStore columnFamilyStore = 
Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+                         columnFamilyStores.add(columnFamilyStore);
+                     }
+                     
ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession,
+                             columnFamilyStores,
 -                            prepareMessage.ranges);
++                            prepareMessage.ranges,
++                            prepareMessage.isIncremental);
+                     MessagingService.instance().sendReply(new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
+                     break;
  
-             case SNAPSHOT:
-                 logger.debug("Snapshotting {}", desc);
-                 ColumnFamilyStore cfs = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
-                 final Range<Token> repairingRange = desc.range;
-                 Set<SSTableReader> snapshottedSSSTables = 
cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
-                 {
-                     public boolean apply(SSTableReader sstable)
+                 case SNAPSHOT:
++                    logger.debug("Snapshotting {}", desc);
+                     ColumnFamilyStore cfs = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
+                     final Range<Token> repairingRange = desc.range;
 -                    cfs.snapshot(desc.sessionId.toString(), new 
Predicate<SSTableReader>()
++                    Set<SSTableReader> snapshottedSSSTables = 
cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
+                     {
+                         public boolean apply(SSTableReader sstable)
+                         {
+                             return sstable != null &&
+                                     !(sstable.partitioner instanceof 
LocalPartitioner) && // exclude SSTables from 2i
+                                     new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
+                         }
+                     });
 -
++                    Set<SSTableReader> currentlyRepairing = 
ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, 
desc.parentSessionId);
++                    if (!Sets.intersection(currentlyRepairing, 
snapshottedSSSTables).isEmpty())
 +                    {
-                         return sstable != null &&
-                                !(sstable.partitioner instanceof 
LocalPartitioner) && // exclude SSTables from 2i
-                                new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
++                        logger.error("Cannot start multiple repair sessions 
over the same sstables");
++                        throw new RuntimeException("Cannot start multiple 
repair sessions over the same sstables");
 +                    }
-                 });
-                 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId,
 snapshottedSSSTables);
-                 logger.debug("Enqueuing response to snapshot request {} to 
{}", desc.sessionId, message.from);
-                 MessagingService.instance().sendReply(new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
-                 break;
++                    
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId,
 snapshottedSSSTables);
+                     logger.debug("Enqueuing response to snapshot request {} 
to {}", desc.sessionId, message.from);
+                     MessagingService.instance().sendReply(new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
+                     break;
  
-             case VALIDATION_REQUEST:
-                 ValidationRequest validationRequest = (ValidationRequest) 
message.payload;
-                 logger.debug("Validating {}", validationRequest);
-                 // trigger read-only compaction
-                 ColumnFamilyStore store = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
+                 case VALIDATION_REQUEST:
+                     ValidationRequest validationRequest = (ValidationRequest) 
message.payload;
++                    logger.debug("Validating {}", validationRequest);
+                     // trigger read-only compaction
+                     ColumnFamilyStore store = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
  
-                 Validator validator = new Validator(desc, message.from, 
validationRequest.gcBefore);
-                 CompactionManager.instance.submitValidation(store, validator);
-                 break;
+                     Validator validator = new Validator(desc, message.from, 
validationRequest.gcBefore);
+                     CompactionManager.instance.submitValidation(store, 
validator);
+                     break;
  
-             case SYNC_REQUEST:
-                 // forwarded sync request
-                 SyncRequest request = (SyncRequest) message.payload;
-                 logger.debug("Syncing {}", request);
-                 long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
-                 if (desc.parentSessionId != null && 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != 
null)
-                     repairedAt = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
+                 case SYNC_REQUEST:
+                     // forwarded sync request
+                     SyncRequest request = (SyncRequest) message.payload;
 -                    StreamingRepairTask task = new StreamingRepairTask(desc, 
request);
++                    logger.debug("Syncing {}", request);
++                    long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
++                    if (desc.parentSessionId != null && 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != 
null)
++                        repairedAt = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
 +
-                 StreamingRepairTask task = new StreamingRepairTask(desc, 
request, repairedAt);
-                 task.run();
-                 break;
++                    StreamingRepairTask task = new StreamingRepairTask(desc, 
request, repairedAt);
+                     task.run();
+                     break;
  
-             case ANTICOMPACTION_REQUEST:
-                 AnticompactionRequest anticompactionRequest = 
(AnticompactionRequest) message.payload;
-                 logger.debug("Got anticompaction request {}", 
anticompactionRequest);
-                 try
-                 {
-                     List<Future<?>> futures = 
ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession,
 anticompactionRequest.successfulRanges);
-                     FBUtilities.waitOnFutures(futures);
-                 }
-                 catch (Exception e)
-                 {
-                     throw new RuntimeException(e);
-                 }
-                 finally
-                 {
-                     
ActiveRepairService.instance.removeParentRepairSession(anticompactionRequest.parentRepairSession);
-                 }
+                 case ANTICOMPACTION_REQUEST:
 -                    logger.debug("Got anticompaction request");
+                     AnticompactionRequest anticompactionRequest = 
(AnticompactionRequest) message.payload;
++                    logger.debug("Got anticompaction request {}", 
anticompactionRequest);
+                     try
+                     {
 -                        List<Future<?>> futures = 
ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
++                        List<Future<?>> futures = 
ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession,
 anticompactionRequest.successfulRanges);
+                         FBUtilities.waitOnFutures(futures);
+                     }
+                     catch (Exception e)
+                     {
+                         throw new RuntimeException(e);
+                     }
+                     finally
+                     {
+                         
ActiveRepairService.instance.removeParentRepairSession(anticompactionRequest.parentRepairSession);
+                     }
  
-                 break;
+                     break;
  
-             default:
-                 ActiveRepairService.instance.handleMessage(message.from, 
message.payload);
-                 break;
+                 default:
+                     ActiveRepairService.instance.handleMessage(message.from, 
message.payload);
+                     break;
+             }
+         }
+         catch (Exception e)
+         {
+             logger.error("Got error, removing parent repair session");
 -            if (desc!=null && desc.parentSessionId != null)
++            if (desc != null && desc.parentSessionId != null)
+                 
ActiveRepairService.instance.removeParentRepairSession(desc.parentSessionId);
+             throw new RuntimeException(e);
          }
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d65d264/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index e4b7fff,36f7c5c..35ddeef
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -103,48 -116,34 +103,48 @@@ public class ActiveRepairServic
       *
       * @return Future for asynchronous call or null if there is no need to 
repair
       */
 -    public RepairFuture submitRepairSession(UUID parentRepairSession, 
Range<Token> range, String keyspace, RepairParallelism parallelismDegree, 
Set<InetAddress> endpoints, String... cfnames)
 +    public RepairSession submitRepairSession(UUID parentRepairSession,
 +                                             Range<Token> range,
 +                                             String keyspace,
 +                                             RepairParallelism 
parallelismDegree,
 +                                             Set<InetAddress> endpoints,
 +                                             long repairedAt,
 +                                             ListeningExecutorService 
executor,
 +                                             String... cfnames)
      {
 -        RepairSession session = new RepairSession(parentRepairSession, range, 
keyspace, parallelismDegree, endpoints, cfnames);
 -        if (session.endpoints.isEmpty())
 +        if (endpoints.isEmpty())
              return null;
 -        RepairFuture futureTask = new RepairFuture(session);
 -        executor.execute(futureTask);
 -        return futureTask;
 -    }
  
 -    public void addToActiveSessions(RepairSession session)
 -    {
 +        final RepairSession session = new RepairSession(parentRepairSession, 
UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, 
repairedAt, cfnames);
 +
          sessions.put(session.getId(), session);
 -        Gossiper.instance.register(session);
 -        
FailureDetector.instance.registerFailureDetectionEventListener(session);
 -    }
 +        // register listeners
 +        gossiper.register(session);
 +        failureDetector.registerFailureDetectionEventListener(session);
  
 -    public void removeFromActiveSessions(RepairSession session)
 -    {
 -        Gossiper.instance.unregister(session);
 -        sessions.remove(session.getId());
 +        // unregister listeners at completion
 +        session.addListener(new Runnable()
 +        {
 +            /**
 +             * When repair finished, do clean up
 +             */
 +            public void run()
 +            {
 +                
failureDetector.unregisterFailureDetectionEventListener(session);
 +                gossiper.unregister(session);
 +                sessions.remove(session.getId());
 +            }
 +        }, MoreExecutors.sameThreadExecutor());
 +        session.start(executor);
 +        return session;
      }
  
-     public void terminateSessions()
+     public synchronized void terminateSessions()
      {
 +        Throwable cause = new IOException("Terminate session is called");
          for (RepairSession session : sessions.values())
          {
 -            session.forceShutdown();
 +            session.forceShutdown(cause);
          }
          parentRepairSessions.clear();
      }
@@@ -229,10 -241,10 +229,10 @@@
          return neighbors;
      }
  
-     public UUID prepareForRepair(Set<InetAddress> endpoints, RepairOption 
options, List<ColumnFamilyStore> columnFamilyStores)
 -    public synchronized UUID prepareForRepair(Set<InetAddress> endpoints, 
Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores)
++    public synchronized UUID prepareForRepair(Set<InetAddress> endpoints, 
RepairOption options, List<ColumnFamilyStore> columnFamilyStores)
      {
          UUID parentRepairSession = UUIDGen.getTimeUUID();
 -        registerParentRepairSession(parentRepairSession, columnFamilyStores, 
ranges);
 +        registerParentRepairSession(parentRepairSession, columnFamilyStores, 
options.getRanges(), options.isIncremental());
          final CountDownLatch prepareLatch = new 
CountDownLatch(endpoints.size());
          final AtomicBoolean status = new AtomicBoolean(true);
          final Set<String> failedNodes = Collections.synchronizedSet(new 
HashSet<String>());
@@@ -285,12 -297,46 +285,24 @@@
          return parentRepairSession;
      }
  
 -    public synchronized void registerParentRepairSession(UUID 
parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, 
Collection<Range<Token>> ranges)
 +    public void registerParentRepairSession(UUID parentRepairSession, 
List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, 
boolean isIncremental)
      {
 -        Map<UUID, Set<SSTableReader>> sstablesToRepair = new HashMap<>();
 -        for (ColumnFamilyStore cfs : columnFamilyStores)
 -        {
 -            Set<SSTableReader> sstables = new HashSet<>();
 -            Set<SSTableReader> currentlyRepairing = 
currentlyRepairing(cfs.metadata.cfId);
 -            for (SSTableReader sstable : cfs.getSSTables())
 -            {
 -                if (new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(ranges))
 -                {
 -                    if (!sstable.isRepaired())
 -                    {
 -                        if (currentlyRepairing.contains(sstable))
 -                        {
 -                            logger.error("Already repairing "+sstable+", can 
not continue.");
 -                            throw new RuntimeException("Already repairing 
"+sstable+", can not continue.");
 -                        }
 -                        sstables.add(sstable);
 -                    }
 -                }
 -            }
 -            sstablesToRepair.put(cfs.metadata.cfId, sstables);
 -        }
 -        parentRepairSessions.put(parentRepairSession, new 
ParentRepairSession(columnFamilyStores, ranges, sstablesToRepair, 
System.currentTimeMillis()));
 +        parentRepairSessions.put(parentRepairSession, new 
ParentRepairSession(columnFamilyStores, ranges, isIncremental, 
System.currentTimeMillis()));
      }
  
 -    private Set<SSTableReader> currentlyRepairing(UUID cfId)
++    public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID 
parentRepairSession)
+     {
+         Set<SSTableReader> repairing = new HashSet<>();
+         for (Map.Entry<UUID, ParentRepairSession> entry : 
parentRepairSessions.entrySet())
+         {
+             Collection<SSTableReader> sstables = 
entry.getValue().sstableMap.get(cfId);
 -            if (sstables != null)
++            if (sstables != null && 
!entry.getKey().equals(parentRepairSession))
+                 repairing.addAll(sstables);
+         }
+         return repairing;
+     }
+ 
 -    public synchronized void finishParentSession(UUID parentSession, 
Set<InetAddress> neighbors, boolean doAntiCompaction) throws 
InterruptedException, ExecutionException, IOException
 +    public void finishParentSession(UUID parentSession, Set<InetAddress> 
neighbors, Collection<Range<Token>> successfulRanges)
      {
          try
          {
@@@ -323,29 -372,13 +335,17 @@@
      {
          assert parentRepairSession != null;
          ParentRepairSession prs = getParentRepairSession(parentRepairSession);
 +        assert prs.ranges.containsAll(successfulRanges) : "Trying to perform 
anticompaction on unknown ranges";
  
          List<Future<?>> futures = new ArrayList<>();
 +        // if we don't have successful repair ranges, then just skip 
anticompaction
 +        if (successfulRanges.isEmpty())
 +            return futures;
          for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : 
prs.columnFamilyStores.entrySet())
          {
- 
              Collection<SSTableReader> sstables = new 
HashSet<>(prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey()));
              ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
-             boolean success = false;
-             while (!success)
-             {
-                 for (SSTableReader compactingSSTable : 
cfs.getDataTracker().getCompacting())
-                 {
-                     if (sstables.remove(compactingSSTable))
-                         
SSTableReader.releaseReferences(Arrays.asList(compactingSSTable));
-                 }
-                 success = sstables.isEmpty() || 
cfs.getDataTracker().markCompacting(sstables);
-             }
- 
 -            futures.add(CompactionManager.instance.submitAntiCompaction(cfs, 
prs.ranges, sstables, prs.repairedAt));
 +            futures.add(CompactionManager.instance.submitAntiCompaction(cfs, 
successfulRanges, sstables, prs.repairedAt));
          }
  
          return futures;
@@@ -419,19 -442,15 +419,29 @@@
              return sstables;
          }
  
 +        public synchronized Set<SSTableReader> 
getAndReferenceSSTablesInRange(UUID cfId, Range<Token> range)
 +        {
-             Collection<SSTableReader> allSSTables= 
getAndReferenceSSTables(cfId);
++            Collection<SSTableReader> allSSTables = 
getAndReferenceSSTables(cfId);
 +            Set<SSTableReader> sstables = new HashSet<>();
 +            for (SSTableReader sstable : allSSTables)
 +            {
 +                if (new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(Arrays.asList(range)))
 +                    sstables.add(sstable);
 +                else
 +                    sstable.releaseReference();
 +            }
 +            return sstables;
++        }
 +
+         @Override
+         public String toString()
+         {
+             return "ParentRepairSession{" +
+                     "columnFamilyStores=" + columnFamilyStores +
+                     ", ranges=" + ranges +
+                     ", sstableMap=" + sstableMap +
+                     ", repairedAt=" + repairedAt +
+                     '}';
          }
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d65d264/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------

Reply via email to