Merge branch 'cassandra-2.1' into trunk Conflicts: src/java/org/apache/cassandra/db/compaction/CompactionManager.java src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java src/java/org/apache/cassandra/service/ActiveRepairService.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d65d264 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d65d264 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d65d264 Branch: refs/heads/trunk Commit: 6d65d264eed638e86d19c102a2095e3e42f9deb4 Parents: b66a547 6d9d175 Author: Marcus Eriksson <marc...@apache.org> Authored: Wed Jan 21 10:24:02 2015 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Jan 21 10:24:02 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 42 ++++- .../repair/RepairMessageVerbHandler.java | 157 ++++++++++--------- .../cassandra/service/ActiveRepairService.java | 42 +++-- .../cassandra/streaming/StreamSession.java | 1 - 5 files changed, 151 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d65d264/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index b601bb9,ea2ecc0..6d60699 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,58 -1,5 +1,59 @@@ +3.0 + * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560) + * Support direct buffer decompression for reads (CASSANDRA-8464) + * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039) + * Add role based access control (CASSANDRA-7653) + * Group sstables for anticompaction correctly (CASSANDRA-8578) + * Add ReadFailureException to native protocol, respond + immediately when replicas encounter errors while handling + a read request (CASSANDRA-7886) + * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308) + * Allow mixing token and partition key restrictions (CASSANDRA-7016) + * Support index key/value entries on map collections (CASSANDRA-8473) + * Modernize schema tables (CASSANDRA-8261) + * Support for user-defined aggregation functions (CASSANDRA-8053) + * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419) + * Refactor SelectStatement, return IN results in natural order instead + of IN value list order (CASSANDRA-7981) + * Support UDTs, tuples, and collections in user-defined + functions (CASSANDRA-7563) + * Fix aggregate fn results on empty selection, result column name, + and cqlsh parsing (CASSANDRA-8229) + * Mark sstables as repaired after full repair (CASSANDRA-7586) + * Extend Descriptor to include a format value and refactor reader/writer + APIs (CASSANDRA-7443) + * Integrate JMH for microbenchmarks (CASSANDRA-8151) + * Keep sstable levels when bootstrapping (CASSANDRA-7460) + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838) + * Support for aggregation functions (CASSANDRA-4914) + * Remove cassandra-cli (CASSANDRA-7920) + * Accept dollar quoted strings in CQL (CASSANDRA-7769) + * Make assassinate a first class command (CASSANDRA-7935) + * Support IN clause on any clustering column (CASSANDRA-4762) + * Improve compaction logging (CASSANDRA-7818) + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917) + * Do anticompaction in groups (CASSANDRA-6851) + * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929, + 7924, 7812, 8063, 7813, 7708) + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416) + * Move sstable RandomAccessReader to nio2, which allows using the + FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050) + * Remove CQL2 (CASSANDRA-5918) + * Add Thrift get_multi_slice call (CASSANDRA-6757) + * Optimize fetching multiple cells by name (CASSANDRA-6933) + * Allow compilation in java 8 (CASSANDRA-7028) + * Make incremental repair default (CASSANDRA-7250) + * Enable code coverage thru JaCoCo (CASSANDRA-7226) + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) + * Shorten SSTable path (CASSANDRA-6962) + * Use unsafe mutations for most unit tests (CASSANDRA-6969) + * Fix race condition during calculation of pending ranges (CASSANDRA-7390) + * Fail on very large batch sizes (CASSANDRA-8011) + * Improve concurrency of repair (CASSANDRA-6455, 8208) + + 2.1.3 + * Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316) * Invalidate prepared BATCH statements when related tables or keyspaces are dropped (CASSANDRA-8652) * Fix missing results in secondary index queries on collections http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d65d264/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 2bba863,02f5e81..1e06f5e --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -391,9 -414,9 +416,9 @@@ public class CompactionManager implemen public void performAnticompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Collection<SSTableReader> validatedForRepair, - long repairedAt) throws InterruptedException, ExecutionException, IOException + long repairedAt) throws InterruptedException, IOException { - logger.info("Starting anticompaction for {}/{}", cfs.keyspace.getName(), cfs.getColumnFamilyName()); + logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size()); logger.debug("Starting anticompaction for ranges {}", ranges); Set<SSTableReader> sstables = new HashSet<>(validatedForRepair); Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); @@@ -928,21 -921,12 +953,28 @@@ { // flush first so everyone is validating data that is as similar as possible StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name); - // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced - // instead so they won't be cleaned up if they do get compacted during the validation - if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null) - sstables = cfs.markCurrentSSTablesReferenced(); - else - sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId); + Set<SSTableReader> sstablesToValidate = new HashSet<>(); + for (SSTableReader sstable : cfs.getSSTables()) + { + if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Arrays.asList(validator.desc.range))) + { + if (!prs.isIncremental || !sstable.isRepaired()) + { + sstablesToValidate.add(sstable); + } + } + } ++ Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId); ++ ++ if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty()) ++ { ++ logger.error("Cannot start multiple repair sessions over the same sstables"); ++ throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); ++ } + prs.addSSTables(cfs.metadata.cfId, sstablesToValidate); + + sstables = prs.getAndReferenceSSTablesInRange(cfs.metadata.cfId, validator.desc.range); if (validator.gcBefore > 0) gcBefore = validator.gcBefore; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d65d264/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 1880e8e,c7cf4c8..be685cd --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@@ -26,11 -24,6 +26,12 @@@ import java.util.UUID import java.util.concurrent.Future; import com.google.common.base.Predicate; ++import com.google.common.collect.Sets; + +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -60,87 -57,88 +61,103 @@@ public class RepairMessageVerbHandler i { // TODO add cancel/interrupt message RepairJobDesc desc = message.payload.desc; - switch (message.payload.messageType) + try { - case PREPARE_MESSAGE: - PrepareMessage prepareMessage = (PrepareMessage) message.payload; - logger.debug("Preparing, {}", prepareMessage); - List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.cfIds.size()); - for (UUID cfId : prepareMessage.cfIds) - { - Pair<String, String> kscf = Schema.instance.getCF(cfId); - ColumnFamilyStore columnFamilyStore = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - columnFamilyStores.add(columnFamilyStore); - } - ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession, - columnFamilyStores, - prepareMessage.ranges, - prepareMessage.isIncremental); - MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); - break; + switch (message.payload.messageType) + { + case PREPARE_MESSAGE: + PrepareMessage prepareMessage = (PrepareMessage) message.payload; ++ logger.debug("Preparing, {}", prepareMessage); + List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.cfIds.size()); + for (UUID cfId : prepareMessage.cfIds) + { + Pair<String, String> kscf = Schema.instance.getCF(cfId); + ColumnFamilyStore columnFamilyStore = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + columnFamilyStores.add(columnFamilyStore); + } + ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession, + columnFamilyStores, - prepareMessage.ranges); ++ prepareMessage.ranges, ++ prepareMessage.isIncremental); + MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); + break; - case SNAPSHOT: - logger.debug("Snapshotting {}", desc); - ColumnFamilyStore cfs = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily); - final Range<Token> repairingRange = desc.range; - Set<SSTableReader> snapshottedSSSTables = cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() - { - public boolean apply(SSTableReader sstable) + case SNAPSHOT: ++ logger.debug("Snapshotting {}", desc); + ColumnFamilyStore cfs = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily); + final Range<Token> repairingRange = desc.range; - cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() ++ Set<SSTableReader> snapshottedSSSTables = cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() + { + public boolean apply(SSTableReader sstable) + { + return sstable != null && + !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i + new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange)); + } + }); - ++ Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, desc.parentSessionId); ++ if (!Sets.intersection(currentlyRepairing, snapshottedSSSTables).isEmpty()) + { - return sstable != null && - !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i - new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange)); ++ logger.error("Cannot start multiple repair sessions over the same sstables"); ++ throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); + } - }); - ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId, snapshottedSSSTables); - logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from); - MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); - break; ++ ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId, snapshottedSSSTables); + logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from); + MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); + break; - case VALIDATION_REQUEST: - ValidationRequest validationRequest = (ValidationRequest) message.payload; - logger.debug("Validating {}", validationRequest); - // trigger read-only compaction - ColumnFamilyStore store = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily); + case VALIDATION_REQUEST: + ValidationRequest validationRequest = (ValidationRequest) message.payload; ++ logger.debug("Validating {}", validationRequest); + // trigger read-only compaction + ColumnFamilyStore store = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily); - Validator validator = new Validator(desc, message.from, validationRequest.gcBefore); - CompactionManager.instance.submitValidation(store, validator); - break; + Validator validator = new Validator(desc, message.from, validationRequest.gcBefore); + CompactionManager.instance.submitValidation(store, validator); + break; - case SYNC_REQUEST: - // forwarded sync request - SyncRequest request = (SyncRequest) message.payload; - logger.debug("Syncing {}", request); - long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE; - if (desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != null) - repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt; + case SYNC_REQUEST: + // forwarded sync request + SyncRequest request = (SyncRequest) message.payload; - StreamingRepairTask task = new StreamingRepairTask(desc, request); ++ logger.debug("Syncing {}", request); ++ long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE; ++ if (desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != null) ++ repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt; + - StreamingRepairTask task = new StreamingRepairTask(desc, request, repairedAt); - task.run(); - break; ++ StreamingRepairTask task = new StreamingRepairTask(desc, request, repairedAt); + task.run(); + break; - case ANTICOMPACTION_REQUEST: - AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload; - logger.debug("Got anticompaction request {}", anticompactionRequest); - try - { - List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession, anticompactionRequest.successfulRanges); - FBUtilities.waitOnFutures(futures); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - finally - { - ActiveRepairService.instance.removeParentRepairSession(anticompactionRequest.parentRepairSession); - } + case ANTICOMPACTION_REQUEST: - logger.debug("Got anticompaction request"); + AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload; ++ logger.debug("Got anticompaction request {}", anticompactionRequest); + try + { - List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession); ++ List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession, anticompactionRequest.successfulRanges); + FBUtilities.waitOnFutures(futures); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + finally + { + ActiveRepairService.instance.removeParentRepairSession(anticompactionRequest.parentRepairSession); + } - break; + break; - default: - ActiveRepairService.instance.handleMessage(message.from, message.payload); - break; + default: + ActiveRepairService.instance.handleMessage(message.from, message.payload); + break; + } + } + catch (Exception e) + { + logger.error("Got error, removing parent repair session"); - if (desc!=null && desc.parentSessionId != null) ++ if (desc != null && desc.parentSessionId != null) + ActiveRepairService.instance.removeParentRepairSession(desc.parentSessionId); + throw new RuntimeException(e); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d65d264/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index e4b7fff,36f7c5c..35ddeef --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -103,48 -116,34 +103,48 @@@ public class ActiveRepairServic * * @return Future for asynchronous call or null if there is no need to repair */ - public RepairFuture submitRepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, String... cfnames) + public RepairSession submitRepairSession(UUID parentRepairSession, + Range<Token> range, + String keyspace, + RepairParallelism parallelismDegree, + Set<InetAddress> endpoints, + long repairedAt, + ListeningExecutorService executor, + String... cfnames) { - RepairSession session = new RepairSession(parentRepairSession, range, keyspace, parallelismDegree, endpoints, cfnames); - if (session.endpoints.isEmpty()) + if (endpoints.isEmpty()) return null; - RepairFuture futureTask = new RepairFuture(session); - executor.execute(futureTask); - return futureTask; - } - public void addToActiveSessions(RepairSession session) - { + final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, cfnames); + sessions.put(session.getId(), session); - Gossiper.instance.register(session); - FailureDetector.instance.registerFailureDetectionEventListener(session); - } + // register listeners + gossiper.register(session); + failureDetector.registerFailureDetectionEventListener(session); - public void removeFromActiveSessions(RepairSession session) - { - Gossiper.instance.unregister(session); - sessions.remove(session.getId()); + // unregister listeners at completion + session.addListener(new Runnable() + { + /** + * When repair finished, do clean up + */ + public void run() + { + failureDetector.unregisterFailureDetectionEventListener(session); + gossiper.unregister(session); + sessions.remove(session.getId()); + } + }, MoreExecutors.sameThreadExecutor()); + session.start(executor); + return session; } - public void terminateSessions() + public synchronized void terminateSessions() { + Throwable cause = new IOException("Terminate session is called"); for (RepairSession session : sessions.values()) { - session.forceShutdown(); + session.forceShutdown(cause); } parentRepairSessions.clear(); } @@@ -229,10 -241,10 +229,10 @@@ return neighbors; } - public UUID prepareForRepair(Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) - public synchronized UUID prepareForRepair(Set<InetAddress> endpoints, Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores) ++ public synchronized UUID prepareForRepair(Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) { UUID parentRepairSession = UUIDGen.getTimeUUID(); - registerParentRepairSession(parentRepairSession, columnFamilyStores, ranges); + registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental()); final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size()); final AtomicBoolean status = new AtomicBoolean(true); final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>()); @@@ -285,12 -297,46 +285,24 @@@ return parentRepairSession; } - public synchronized void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges) + public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental) { - Map<UUID, Set<SSTableReader>> sstablesToRepair = new HashMap<>(); - for (ColumnFamilyStore cfs : columnFamilyStores) - { - Set<SSTableReader> sstables = new HashSet<>(); - Set<SSTableReader> currentlyRepairing = currentlyRepairing(cfs.metadata.cfId); - for (SSTableReader sstable : cfs.getSSTables()) - { - if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges)) - { - if (!sstable.isRepaired()) - { - if (currentlyRepairing.contains(sstable)) - { - logger.error("Already repairing "+sstable+", can not continue."); - throw new RuntimeException("Already repairing "+sstable+", can not continue."); - } - sstables.add(sstable); - } - } - } - sstablesToRepair.put(cfs.metadata.cfId, sstables); - } - parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, sstablesToRepair, System.currentTimeMillis())); + parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, System.currentTimeMillis())); } - private Set<SSTableReader> currentlyRepairing(UUID cfId) ++ public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID parentRepairSession) + { + Set<SSTableReader> repairing = new HashSet<>(); + for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet()) + { + Collection<SSTableReader> sstables = entry.getValue().sstableMap.get(cfId); - if (sstables != null) ++ if (sstables != null && !entry.getKey().equals(parentRepairSession)) + repairing.addAll(sstables); + } + return repairing; + } + - public synchronized void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException, IOException + public void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, Collection<Range<Token>> successfulRanges) { try { @@@ -323,29 -372,13 +335,17 @@@ { assert parentRepairSession != null; ParentRepairSession prs = getParentRepairSession(parentRepairSession); + assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction on unknown ranges"; List<Future<?>> futures = new ArrayList<>(); + // if we don't have successful repair ranges, then just skip anticompaction + if (successfulRanges.isEmpty()) + return futures; for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet()) { - Collection<SSTableReader> sstables = new HashSet<>(prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey())); ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue(); - boolean success = false; - while (!success) - { - for (SSTableReader compactingSSTable : cfs.getDataTracker().getCompacting()) - { - if (sstables.remove(compactingSSTable)) - SSTableReader.releaseReferences(Arrays.asList(compactingSSTable)); - } - success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables); - } - - futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt)); + futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt)); } return futures; @@@ -419,19 -442,15 +419,29 @@@ return sstables; } + public synchronized Set<SSTableReader> getAndReferenceSSTablesInRange(UUID cfId, Range<Token> range) + { - Collection<SSTableReader> allSSTables= getAndReferenceSSTables(cfId); ++ Collection<SSTableReader> allSSTables = getAndReferenceSSTables(cfId); + Set<SSTableReader> sstables = new HashSet<>(); + for (SSTableReader sstable : allSSTables) + { + if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Arrays.asList(range))) + sstables.add(sstable); + else + sstable.releaseReference(); + } + return sstables; ++ } + + @Override + public String toString() + { + return "ParentRepairSession{" + + "columnFamilyStores=" + columnFamilyStores + + ", ranges=" + ranges + + ", sstableMap=" + sstableMap + + ", repairedAt=" + repairedAt + + '}'; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d65d264/src/java/org/apache/cassandra/streaming/StreamSession.java ----------------------------------------------------------------------