Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ad84add7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ad84add7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ad84add7 Branch: refs/heads/cassandra-3.0 Commit: ad84add7268ae31f4af681ec9f726a2efe532cda Parents: f7aaea0 9f19dd4 Author: Marcus Eriksson <marc...@apache.org> Authored: Wed Dec 2 08:55:36 2015 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Dec 2 08:55:36 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 4 +- src/java/org/apache/cassandra/dht/Range.java | 42 +++++++++++++ .../org/apache/cassandra/dht/RangeTest.java | 65 ++++++++++++++++++++ 4 files changed, 110 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad84add7/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 1af2745,eaad3a2..1b3838d --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -24,7 -8,17 +24,8 @@@ Merged from 2.2 * Fix SimpleDateType type compatibility (CASSANDRA-10027) * (Hadoop) fix splits calculation (CASSANDRA-10640) * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) - * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) - * Use most up-to-date version of schema for system tables (CASSANDRA-10652) - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) - * Expose phi values from failure detector via JMX and tweak debug - and trace logging (CASSANDRA-9526) - * Fix RangeNamesQueryPager (CASSANDRA-10509) - * Deprecate Pig support (CASSANDRA-10542) - * Reduce contention getting instances of CompositeType (CASSANDRA-10433) - * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592) Merged from 2.1: + * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768) * Add proper error handling to stream receiver (CASSANDRA-10774) * Warn or fail when changing cluster topology live (CASSANDRA-10243) * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad84add7/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 7cf78bc,65f93c0..3ce7d2c --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -1193,31 -1208,35 +1193,31 @@@ public class CompactionManager implemen logger.info("Anticompacting {}", anticompactionGroup); Set<SSTableReader> sstableAsSet = anticompactionGroup.originals(); - File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); + File destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); long repairedKeyCount = 0; long unrepairedKeyCount = 0; - AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); - try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false); - SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false); + int nowInSec = FBUtilities.nowInSeconds(); + + CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); + try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(anticompactionGroup, groupMaxDataAge, false, false); + SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(anticompactionGroup, groupMaxDataAge, false, false); AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals()); - CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs))) + CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec)); + CompactionIterator ci = new CompactionIterator(OperationType.ANTICOMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics)) { - int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstableAsSet))); - - repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet)); - unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet)); + int expectedBloomFilterSize = Math.max(cfs.metadata.params.minIndexInterval, (int)(SSTableReader.getApproximateKeyCount(sstableAsSet))); - CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()); - metrics.beginCompaction(ci); - try + repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet, anticompactionGroup)); + unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet, anticompactionGroup)); - ++ Range.OrderedRangeContainmentChecker containmentChecker = new Range.OrderedRangeContainmentChecker(ranges); + while (ci.hasNext()) { - @SuppressWarnings("resource") - CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); - Range.OrderedRangeContainmentChecker containmentChecker = new Range.OrderedRangeContainmentChecker(ranges); - while (iter.hasNext()) + try (UnfilteredRowIterator partition = ci.next()) { - @SuppressWarnings("resource") - AbstractCompactedRow row = iter.next(); // if current range from sstable is repaired, save it into the new repaired sstable - if (Range.isInRanges(partition.partitionKey().getToken(), ranges)) - if (containmentChecker.contains(row.key.getToken())) ++ if (containmentChecker.contains(partition.partitionKey().getToken())) { - repairedSSTableWriter.append(row); + repairedSSTableWriter.append(partition); repairedKeyCount++; } // otherwise save into the new 'non-repaired' table http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad84add7/src/java/org/apache/cassandra/dht/Range.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad84add7/test/unit/org/apache/cassandra/dht/RangeTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/dht/RangeTest.java index 7888b85,85f2586..9c87981 --- a/test/unit/org/apache/cassandra/dht/RangeTest.java +++ b/test/unit/org/apache/cassandra/dht/RangeTest.java @@@ -18,19 -19,25 +18,24 @@@ package org.apache.cassandra.dht; import java.nio.ByteBuffer; + import java.util.ArrayList; + import java.util.Collections; import java.util.HashSet; import java.util.List; + import java.util.Random; import java.util.Set; + import com.google.common.base.Joiner; - -import static java.util.Arrays.asList; - import org.apache.commons.lang3.StringUtils; import org.junit.Test; -import org.apache.cassandra.db.RowPosition; -import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken; + +import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken; +import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken; +import static java.util.Arrays.asList; import static org.apache.cassandra.Util.range; + import static org.junit.Assert.*; public class RangeTest