This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit e53ad6461224b1ab096f56d9e2c744126eb532cd Merge: f0bb299 b58a5c8 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Thu Feb 17 10:30:05 2022 +0100 Merge branch 'cassandra-3.0' into cassandra-3.11 CHANGES.txt | 1 + .../cassandra/db/compaction/CompactionTask.java | 6 +- .../db/compaction/LeveledCompactionTask.java | 45 +++++- .../compaction/writers/CompactionAwareWriter.java | 7 +- .../writers/MajorLeveledCompactionWriter.java | 6 + .../compaction/writers/MaxSSTableSizeWriter.java | 6 + .../SplittingSizeTieredCompactionWriter.java | 8 +- .../compaction/LeveledCompactionStrategyTest.java | 152 +++++++++++++++++++++ .../{ => writers}/CompactionAwareWriterTest.java | 48 ++++++- 9 files changed, 267 insertions(+), 12 deletions(-) diff --cc CHANGES.txt index 402048f,4d07a3d..513a8af --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,21 -1,9 +1,22 @@@ -3.0.27 - * LeveledCompactionStrategy disk space check improvements (CASSANDRA-17272) +3.11.13 +Merged from 3.0: * Lazy transaction log replica creation allows incorrect replica content divergence during anticompaction (CASSANDRA-17273) ++ * LeveledCompactionStrategy disk space check improvements (CASSANDRA-17272) -3.0.26 +3.11.12 + * Upgrade snakeyaml to 1.26 in 3.11 (CASSANDRA=17028) + * Add key validation to ssstablescrub (CASSANDRA-16969) + * Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851) + * Include SASI components to snapshots (CASSANDRA-15134) + * Make assassinate more resilient to missing tokens (CASSANDRA-16847) + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854) + * Validate SASI tokenizer options before adding index to schema (CASSANDRA-15135) + * Fixup scrub output when no data post-scrub and clear up old use of row, which really means partition (CASSANDRA-16835) + * Fix ant-junit dependency issue (CASSANDRA-16827) + * Reduce thread contention in CommitLogSegment and HintsBuffer (CASSANDRA-16072) + * Avoid sending CDC column if not enabled (CASSANDRA-16770) +Merged from 3.0: * Fix conversion from megabits to bytes in streaming rate limiter (CASSANDRA-17243) * Upgrade logback to 1.2.9 (CASSANDRA-17204) * Avoid race in AbstractReplicationStrategy endpoint caching (CASSANDRA-16673) diff --cc src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java index bc6115e,1ceed1c..7ffe33a --- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java @@@ -158,26 -124,9 +158,26 @@@ public abstract class CompactionAwareWr */ protected void maybeSwitchWriter(DecoratedKey key) { - if (!isInitialized) - switchCompactionLocation(getDirectories().getWriteableLocation(getExpectedWriteSize())); - isInitialized = true; + if (diskBoundaries == null) + { + if (locationIndex < 0) + { - Directories.DataDirectory defaultLocation = getWriteDirectory(nonExpiredSSTables, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, OperationType.UNKNOWN)); ++ Directories.DataDirectory defaultLocation = getWriteDirectory(nonExpiredSSTables, getExpectedWriteSize()); + switchCompactionLocation(defaultLocation); + locationIndex = 0; + } + return; + } + + if (locationIndex > -1 && key.compareTo(diskBoundaries.get(locationIndex)) < 0) + return; + + int prevIdx = locationIndex; + while (locationIndex == -1 || key.compareTo(diskBoundaries.get(locationIndex)) > 0) + locationIndex++; + if (prevIdx >= 0) + logger.debug("Switching write location from {} to {}", locations.get(prevIdx), locations.get(locationIndex)); + switchCompactionLocation(locations.get(locationIndex)); } /** @@@ -198,45 -147,18 +198,50 @@@ /** * Return a directory where we can expect expectedWriteSize to fit. + * + * @param sstables the sstables to compact + * @return */ - public Directories.DataDirectory getWriteDirectory(long expectedWriteSize) + public Directories.DataDirectory getWriteDirectory(Iterable<SSTableReader> sstables, long estimatedWriteSize) { - Directories.DataDirectory directory = getDirectories().getWriteableLocation(expectedWriteSize); - if (directory == null) - throw new RuntimeException("Insufficient disk space to write " + expectedWriteSize + " bytes"); + Descriptor descriptor = null; + for (SSTableReader sstable : sstables) + { + if (descriptor == null) + descriptor = sstable.descriptor; + if (!descriptor.directory.equals(sstable.descriptor.directory)) + { + logger.trace("All sstables not from the same disk - putting results in {}", descriptor.directory); + break; + } + } + Directories.DataDirectory d = getDirectories().getDataDirectoryForFile(descriptor); + if (d != null) + { + long availableSpace = d.getAvailableSpace(); + if (availableSpace < estimatedWriteSize) + throw new RuntimeException(String.format("Not enough space to write %s to %s (%s available)", + FBUtilities.prettyPrintMemory(estimatedWriteSize), + d.location, + FBUtilities.prettyPrintMemory(availableSpace))); + logger.trace("putting compaction results in {}", descriptor.directory); + return d; + } + d = getDirectories().getWriteableLocation(estimatedWriteSize); + if (d == null) + throw new RuntimeException(String.format("Not enough disk space to store %s", + FBUtilities.prettyPrintMemory(estimatedWriteSize))); + return d; + } - return directory; + public CompactionAwareWriter setRepairedAt(long repairedAt) + { + this.sstableWriter.setRepairedAt(repairedAt); + return this; } + + protected long getExpectedWriteSize() + { + return cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()); + } } diff --cc src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index 0beb505,3eee398..f1326e9 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@@ -100,21 -96,23 +100,27 @@@ public class MajorLeveledCompactionWrit } - public void switchCompactionLocation(Directories.DataDirectory directory) + @Override + public void switchCompactionLocation(Directories.DataDirectory location) { - File sstableDirectory = getDirectories().getLocationForDisk(directory); - @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)), - averageEstimatedKeysPerSSTable, - minRepairedAt, - cfs.metadata, - new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel), - SerializationHeader.make(cfs.metadata, nonExpiredSSTables), - txn); - sstableWriter.switchWriter(writer); + this.sstableDirectory = location; + averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1)); + sstableWriter.switchWriter(SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(sstableDirectory))), + keysPerSSTable, + minRepairedAt, + cfs.metadata, + new MetadataCollector(txn.originals(), cfs.metadata.comparator, currentLevel), + SerializationHeader.make(cfs.metadata, txn.originals()), + cfs.indexManager.listIndexes(), + txn)); + partitionsWritten = 0; + sstablesWritten = 0; + } + + @Override + protected long getExpectedWriteSize() + { - return expectedWriteSize; ++ return Math.min(maxSSTableSize, super.getExpectedWriteSize()); + } } diff --cc src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index 864185e,d76381a..36c69b0 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@@ -118,5 -103,12 +118,11 @@@ public class MaxSSTableSizeWriter exten txn); sstableWriter.switchWriter(writer); - } + + @Override + protected long getExpectedWriteSize() + { - return expectedWriteSize; ++ return Math.min(maxSSTableSize, super.getExpectedWriteSize()); + } } diff --cc src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index 46cb891,77672d8..b4d7097 --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@@ -88,12 -92,11 +88,12 @@@ public class SplittingSizeTieredCompact public boolean realAppend(UnfilteredRowIterator partition) { RowIndexEntry rie = sstableWriter.append(partition); - if (sstableWriter.currentWriter().getOnDiskFilePointer() > currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we underestimate how many keys we have, the last sstable might get more than we expect + if (sstableWriter.currentWriter().getEstimatedOnDiskBytesWritten() > currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we underestimate how many keys we have, the last sstable might get more than we expect { currentRatioIndex++; - currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]); + currentBytesToWrite = getExpectedWriteSize(); - switchCompactionLocation(getWriteDirectory(currentBytesToWrite)); + switchCompactionLocation(location); + logger.debug("Switching writer, currentBytesToWrite = {}", currentBytesToWrite); } return rie != null; } @@@ -114,5 -114,11 +114,11 @@@ txn); logger.trace("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite); sstableWriter.switchWriter(writer); - } + ++ @Override + protected long getExpectedWriteSize() + { + return Math.round(totalSize * ratios[currentRatioIndex]); + } } diff --cc test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index eba243f,5bbc931..91b9b3b --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@@ -28,16 -29,17 +29,21 @@@ import java.util.Iterator import java.util.List; import java.util.Map; import java.util.Random; + import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; +import junit.framework.Assert; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; ++ ++import com.google.common.collect.Iterables; ++import com.google.common.collect.Sets; ++import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.junit.Test; import org.junit.runner.RunWith; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -62,11 -64,12 +68,14 @@@ import org.apache.cassandra.schema.Comp import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.Pair; +import static java.util.Collections.singleton; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; + import static org.junit.Assert.assertNotNull; + import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @RunWith(OrderedJUnit4ClassRunner.class) @@@ -376,351 -380,146 +385,494 @@@ public class LeveledCompactionStrategyT assertFalse(repaired.manifest.getLevel(1).contains(sstable2)); } + + + @Test + public void testTokenRangeCompaction() throws Exception + { + // Remove any existing data so we can start out clean with predictable number of sstables + cfs.truncateBlocking(); + + // Disable auto compaction so cassandra does not compact + CompactionManager.instance.disableAutoCompaction(); + + ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files + + DecoratedKey key1 = Util.dk(String.valueOf(1)); + DecoratedKey key2 = Util.dk(String.valueOf(2)); + List<DecoratedKey> keys = new ArrayList<>(Arrays.asList(key1, key2)); + int numIterations = 10; + int columns = 2; + + // Add enough data to trigger multiple sstables. + + // create 10 sstables that contain data for both key1 and key2 + for (int i = 0; i < numIterations; i++) { + for (DecoratedKey key : keys) { + UpdateBuilder update = UpdateBuilder.create(cfs.metadata, key); + for (int c = 0; c < columns; c++) + update.newRow("column" + c).add("val", value); + update.applyUnsafe(); + } + cfs.forceBlockingFlush(); + } + + // create 20 more sstables with 10 containing data for key1 and other 10 containing data for key2 + for (int i = 0; i < numIterations; i++) { + for (DecoratedKey key : keys) { + UpdateBuilder update = UpdateBuilder.create(cfs.metadata, key); + for (int c = 0; c < columns; c++) + update.newRow("column" + c).add("val", value); + update.applyUnsafe(); + cfs.forceBlockingFlush(); + } + } + + // We should have a total of 30 sstables by now + assertEquals(30, cfs.getLiveSSTables().size()); + + // Compact just the tables with key2 + // Bit hackish to use the key1.token as the prior key but works in BytesToken + Range<Token> tokenRange = new Range<>(key2.getToken(), key2.getToken()); + Collection<Range<Token>> tokenRanges = new ArrayList<>(Arrays.asList(tokenRange)); + cfs.forceCompactionForTokenRange(tokenRanges); + + while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) { + Thread.sleep(100); + } + + // 20 tables that have key2 should have been compacted in to 1 table resulting in 11 (30-20+1) + assertEquals(11, cfs.getLiveSSTables().size()); + + // Compact just the tables with key1. At this point all 11 tables should have key1 + Range<Token> tokenRange2 = new Range<>(key1.getToken(), key1.getToken()); + Collection<Range<Token>> tokenRanges2 = new ArrayList<>(Arrays.asList(tokenRange2)); + cfs.forceCompactionForTokenRange(tokenRanges2); + + + while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) { + Thread.sleep(100); + } + + // the 11 tables containing key1 should all compact to 1 table + assertEquals(1, cfs.getLiveSSTables().size()); + } + + @Test + public void testCompactionCandidateOrdering() throws Exception + { + // add some data + byte [] b = new byte[100 * 1024]; + new Random().nextBytes(b); + ByteBuffer value = ByteBuffer.wrap(b); + int rows = 4; + int columns = 10; + // Just keep sstables in L0 for this test + cfs.disableAutoCompaction(); + for (int r = 0; r < rows; r++) + { + UpdateBuilder update = UpdateBuilder.create(cfs.metadata, String.valueOf(r)); + for (int c = 0; c < columns; c++) + update.newRow("column" + c).add("val", value); + update.applyUnsafe(); + cfs.forceBlockingFlush(); + } + LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) (cfs.getCompactionStrategyManager()).getStrategies().get(1).get(0); + // get readers for level 0 sstables + Collection<SSTableReader> sstables = strategy.manifest.getLevel(0); + Collection<SSTableReader> sortedCandidates = strategy.manifest.ageSortedSSTables(sstables); + assertTrue(String.format("More than 1 sstable required for test, found: %d .", sortedCandidates.size()), sortedCandidates.size() > 1); + long lastMaxTimeStamp = Long.MIN_VALUE; + for (SSTableReader sstable : sortedCandidates) + { + assertTrue(String.format("SStables not sorted into oldest to newest by maxTimestamp. Current sstable: %d , last sstable: %d", sstable.getMaxTimestamp(), lastMaxTimeStamp), + sstable.getMaxTimestamp() > lastMaxTimeStamp); + lastMaxTimeStamp = sstable.getMaxTimestamp(); + } + } + + @Test + public void testAddingOverlapping() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions()); + List<SSTableReader> currentLevel = new ArrayList<>(); + int gen = 1; + currentLevel.add(MockSchema.sstableWithLevel(gen++, 10, 20, 1, cfs)); + currentLevel.add(MockSchema.sstableWithLevel(gen++, 21, 30, 1, cfs)); + currentLevel.add(MockSchema.sstableWithLevel(gen++, 51, 100, 1, cfs)); + currentLevel.add(MockSchema.sstableWithLevel(gen++, 80, 120, 1, cfs)); + currentLevel.add(MockSchema.sstableWithLevel(gen++, 90, 150, 1, cfs)); + + lm.addSSTables(currentLevel); + assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3)); + assertLevelsEqual(lm.getLevel(0), currentLevel.subList(3, 5)); + + List<SSTableReader> newSSTables = new ArrayList<>(); + // this sstable last token is the same as the first token of L1 above, should get sent to L0: + newSSTables.add(MockSchema.sstableWithLevel(gen++, 5, 10, 1, cfs)); + lm.addSSTables(newSSTables); + assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3)); + assertEquals(0, newSSTables.get(0).getSSTableLevel()); + assertTrue(lm.getLevel(0).containsAll(newSSTables)); + + newSSTables.clear(); + newSSTables.add(MockSchema.sstableWithLevel(gen++, 30, 40, 1, cfs)); + lm.addSSTables(newSSTables); + assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3)); + assertEquals(0, newSSTables.get(0).getSSTableLevel()); + assertTrue(lm.getLevel(0).containsAll(newSSTables)); + + newSSTables.clear(); + newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs)); + lm.addSSTables(newSSTables); + assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3)); + assertEquals(0, newSSTables.get(0).getSSTableLevel()); + assertTrue(lm.getLevel(0).containsAll(newSSTables)); + + newSSTables.clear(); + newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs)); + newSSTables.add(MockSchema.sstableWithLevel(gen++, 120, 140, 1, cfs)); + lm.addSSTables(newSSTables); + List<SSTableReader> newL1 = new ArrayList<>(currentLevel.subList(0, 3)); + newL1.add(newSSTables.get(1)); + assertLevelsEqual(lm.getLevel(1), newL1); + newSSTables.remove(1); + assertTrue(newSSTables.stream().allMatch(s -> s.getSSTableLevel() == 0)); + assertTrue(lm.getLevel(0).containsAll(newSSTables)); + } + + @Test + public void singleTokenSSTableTest() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions()); + List<SSTableReader> expectedL1 = new ArrayList<>(); + + int gen = 1; + // single sstable, single token (100) + expectedL1.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs)); + lm.addSSTables(expectedL1); + + List<SSTableReader> expectedL0 = new ArrayList<>(); + + // should get moved to L0: + expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 101, 1, cfs)); + expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 101, 1, cfs)); + expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 100, 1, cfs)); + expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs)); + lm.addSSTables(expectedL0); + + assertLevelsEqual(expectedL0, lm.getLevel(0)); + assertTrue(expectedL0.stream().allMatch(s -> s.getSSTableLevel() == 0)); + assertLevelsEqual(expectedL1, lm.getLevel(1)); + assertTrue(expectedL1.stream().allMatch(s -> s.getSSTableLevel() == 1)); + + // should work: + expectedL1.add(MockSchema.sstableWithLevel(gen++, 98, 99, 1, cfs)); + expectedL1.add(MockSchema.sstableWithLevel(gen++, 101, 101, 1, cfs)); + lm.addSSTables(expectedL1.subList(1, expectedL1.size())); + assertLevelsEqual(expectedL1, lm.getLevel(1)); + } + + @Test + public void randomMultiLevelAddTest() + { + int iterations = 100; + int levelCount = 9; + + ColumnFamilyStore cfs = MockSchema.newCFS(); + LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions()); + long seed = System.currentTimeMillis(); + Random r = new Random(seed); + List<SSTableReader> newLevels = generateNewRandomLevels(cfs, 40, levelCount, 0, r); + + int sstableCount = newLevels.size(); + lm.addSSTables(newLevels); + + int [] expectedLevelSizes = lm.getAllLevelSize(); + + for (int j = 0; j < iterations; j++) + { + newLevels = generateNewRandomLevels(cfs, 20, levelCount, sstableCount, r); + sstableCount += newLevels.size(); + + int[] canAdd = canAdd(lm, newLevels, levelCount); + for (int i = 0; i < levelCount; i++) + expectedLevelSizes[i] += canAdd[i]; + lm.addSSTables(newLevels); + } + + // and verify no levels overlap + int actualSSTableCount = 0; + for (int i = 0; i < levelCount; i++) + { + actualSSTableCount += lm.getLevelSize(i); + List<SSTableReader> level = new ArrayList<>(lm.getLevel(i)); + int lvl = i; + assertTrue(level.stream().allMatch(s -> s.getSSTableLevel() == lvl)); + if (i > 0) + { + level.sort(SSTableReader.sstableComparator); + SSTableReader prev = null; + for (SSTableReader sstable : level) + { + if (prev != null && sstable.first.compareTo(prev.last) <= 0) + { + String levelStr = level.stream().map(s -> String.format("[%s, %s]", s.first, s.last)).collect(Collectors.joining(", ")); + String overlap = String.format("sstable [%s, %s] overlaps with [%s, %s] in level %d (%s) ", sstable.first, sstable.last, prev.first, prev.last, i, levelStr); + Assert.fail("[seed = "+seed+"] overlap in level "+lvl+": " + overlap); + } + prev = sstable; + } + } + } + assertEquals(sstableCount, actualSSTableCount); + for (int i = 0; i < levelCount; i++) + assertEquals("[seed = " + seed + "] wrong sstable count in level = " + i, expectedLevelSizes[i], lm.getLevel(i).size()); + } + + private static List<SSTableReader> generateNewRandomLevels(ColumnFamilyStore cfs, int maxSSTableCountPerLevel, int levelCount, int startGen, Random r) + { + List<SSTableReader> newLevels = new ArrayList<>(); + for (int level = 0; level < levelCount; level++) + { + int numLevelSSTables = r.nextInt(maxSSTableCountPerLevel) + 1; + List<Integer> tokens = new ArrayList<>(numLevelSSTables * 2); + + for (int i = 0; i < numLevelSSTables * 2; i++) + tokens.add(r.nextInt(4000)); + Collections.sort(tokens); + for (int i = 0; i < tokens.size() - 1; i += 2) + { + SSTableReader sstable = MockSchema.sstableWithLevel(++startGen, tokens.get(i), tokens.get(i + 1), level, cfs); + newLevels.add(sstable); + } + } + return newLevels; + } + + /** + * brute-force checks if the new sstables can be added to the correct level in manifest + * + * @return count of expected sstables to add to each level + */ + private static int[] canAdd(LeveledManifest lm, List<SSTableReader> newSSTables, int levelCount) + { + Map<Integer, Collection<SSTableReader>> sstableGroups = new HashMap<>(); + newSSTables.forEach(s -> sstableGroups.computeIfAbsent(s.getSSTableLevel(), k -> new ArrayList<>()).add(s)); + + int[] canAdd = new int[levelCount]; + for (Map.Entry<Integer, Collection<SSTableReader>> lvlGroup : sstableGroups.entrySet()) + { + int level = lvlGroup.getKey(); + if (level == 0) + { + canAdd[0] += lvlGroup.getValue().size(); + continue; + } + + List<SSTableReader> newLevel = new ArrayList<>(lm.getLevel(level)); + for (SSTableReader sstable : lvlGroup.getValue()) + { + newLevel.add(sstable); + newLevel.sort(SSTableReader.sstableComparator); + + SSTableReader prev = null; + boolean kept = true; + for (SSTableReader sst : newLevel) + { + if (prev != null && prev.last.compareTo(sst.first) >= 0) + { + newLevel.remove(sstable); + kept = false; + break; + } + prev = sst; + } + if (kept) + canAdd[level] += 1; + else + canAdd[0] += 1; + } + } + return canAdd; + } + + private static void assertLevelsEqual(Collection<SSTableReader> l1, Collection<SSTableReader> l2) + { + assertEquals(l1.size(), l2.size()); + assertEquals(new HashSet<>(l1), new HashSet<>(l2)); + } + + @Test + public void testHighestLevelHasMoreDataThanSupported() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + int fanoutSize = 2; // to generate less sstables + LeveledManifest lm = new LeveledManifest(cfs, 1, fanoutSize, new SizeTieredCompactionStrategyOptions()); + + // generate data for L7 to trigger compaction + int l7 = 7; + int maxBytesForL7 = (int) (Math.pow(fanoutSize, l7) * 1024 * 1024); + int sstablesSizeForL7 = (int) (maxBytesForL7 * 1.001) + 1; + List<SSTableReader> sstablesOnL7 = Collections.singletonList(MockSchema.sstableWithLevel( 1, sstablesSizeForL7, l7, cfs)); + lm.addSSTables(sstablesOnL7); + + // generate data for L8 to trigger compaction + int l8 = 8; + int maxBytesForL8 = (int) (Math.pow(fanoutSize, l8) * 1024 * 1024); + int sstablesSizeForL8 = (int) (maxBytesForL8 * 1.001) + 1; + List<SSTableReader> sstablesOnL8 = Collections.singletonList(MockSchema.sstableWithLevel( 2, sstablesSizeForL8, l8, cfs)); + lm.addSSTables(sstablesOnL8); + + // compaction for L8 sstables is not supposed to be run because there is no upper level to promote sstables + // that's why we expect compaction candidates for L7 only + Collection<SSTableReader> compactionCandidates = lm.getCompactionCandidates().sstables; + assertThat(compactionCandidates).containsAll(sstablesOnL7); + assertThat(compactionCandidates).doesNotContainAnyElementsOf(sstablesOnL8); + } ++ + @Test + public void testReduceScopeL0L1() throws IOException + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + Map<String, String> localOptions = new HashMap<>(); + localOptions.put("class", "LeveledCompactionStrategy"); + localOptions.put("sstable_size_in_mb", "1"); + cfs.setCompactionParameters(localOptions); + List<SSTableReader> l1sstables = new ArrayList<>(); + for (int i = 0; i < 10; i++) + { + SSTableReader l1sstable = MockSchema.sstable(i, 1 * 1024 * 1024, cfs); + l1sstable.descriptor.getMetadataSerializer().mutateLevel(l1sstable.descriptor, 1); + l1sstable.reloadSSTableMetadata(); + l1sstables.add(l1sstable); + } + List<SSTableReader> l0sstables = new ArrayList<>(); + for (int i = 10; i < 20; i++) + l0sstables.add(MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs)); - + try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, Iterables.concat(l0sstables, l1sstables))) + { + CompactionTask task = new LeveledCompactionTask(cfs, txn, 1, 0, 1024*1024, false); + SSTableReader lastRemoved = null; + boolean removed = true; + for (int i = 0; i < l0sstables.size(); i++) + { + Set<SSTableReader> before = new HashSet<>(txn.originals()); + removed = task.reduceScopeForLimitedSpace(0); - SSTableReader removedSSTable = Sets.difference(before, txn.originals()).stream().findFirst().orElse(null); ++ SSTableReader removedSSTable = Iterables.getOnlyElement(Sets.difference(before, txn.originals()), null); + if (removed) + { + assertNotNull(removedSSTable); + assertTrue(lastRemoved == null || removedSSTable.onDiskLength() < lastRemoved.onDiskLength()); + assertEquals(0, removedSSTable.getSSTableLevel()); + Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = groupByLevel(txn.originals()); + Set<SSTableReader> l1after = sstables.right; + + assertEquals(l1after, new HashSet<>(l1sstables)); // we don't touch L1 + assertEquals(before.size() - 1, txn.originals().size()); + lastRemoved = removedSSTable; + } + else + { + assertNull(removedSSTable); + Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = groupByLevel(txn.originals()); + Set<SSTableReader> l0after = sstables.left; + Set<SSTableReader> l1after = sstables.right; + assertEquals(l1after, new HashSet<>(l1sstables)); // we don't touch L1 + assertEquals(1, l0after.size()); // and we stop reducing once there is a single sstable left + } + } + assertFalse(removed); + } + } + + @Test + public void testReduceScopeL0() + { + + List<SSTableReader> l0sstables = new ArrayList<>(); + for (int i = 10; i < 20; i++) + l0sstables.add(MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs)); + + try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, l0sstables)) + { + CompactionTask task = new LeveledCompactionTask(cfs, txn, 0, 0, 1024*1024, false); + + SSTableReader lastRemoved = null; + boolean removed = true; + for (int i = 0; i < l0sstables.size(); i++) + { + Set<SSTableReader> before = new HashSet<>(txn.originals()); + removed = task.reduceScopeForLimitedSpace(0); + SSTableReader removedSSTable = Sets.difference(before, txn.originals()).stream().findFirst().orElse(null); + if (removed) + { + assertNotNull(removedSSTable); + assertTrue(lastRemoved == null || removedSSTable.onDiskLength() < lastRemoved.onDiskLength()); + assertEquals(0, removedSSTable.getSSTableLevel()); + assertEquals(before.size() - 1, txn.originals().size()); + lastRemoved = removedSSTable; + } + else + { + assertNull(removedSSTable); + Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = groupByLevel(txn.originals()); + Set<SSTableReader> l0after = sstables.left; + assertEquals(1, l0after.size()); // and we stop reducing once there is a single sstable left + } + } + assertFalse(removed); + } + } + + @Test + public void testNoHighLevelReduction() throws IOException + { + List<SSTableReader> sstables = new ArrayList<>(); + int i = 1; + for (; i < 5; i++) + { + SSTableReader sstable = MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs); + sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 1); + sstable.reloadSSTableMetadata(); + sstables.add(sstable); + } + for (; i < 10; i++) + { + SSTableReader sstable = MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs); + sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 2); + sstable.reloadSSTableMetadata(); + sstables.add(sstable); + } + try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, sstables)) + { + CompactionTask task = new LeveledCompactionTask(cfs, txn, 0, 0, 1024 * 1024, false); + assertFalse(task.reduceScopeForLimitedSpace(0)); + assertEquals(new HashSet<>(sstables), txn.originals()); + } + } + + private Pair<Set<SSTableReader>, Set<SSTableReader>> groupByLevel(Iterable<SSTableReader> sstables) + { + Set<SSTableReader> l1after = new HashSet<>(); + Set<SSTableReader> l0after = new HashSet<>(); - for (SSTableReader kept : sstables) ++ for (SSTableReader sstable : sstables) + { - switch (kept.getSSTableLevel()) ++ switch (sstable.getSSTableLevel()) + { + case 0: - l0after.add(kept); ++ l0after.add(sstable); + break; + case 1: - l1after.add(kept); ++ l1after.add(sstable); + break; + default: + throw new RuntimeException("only l0 & l1 sstables"); + } + } + return Pair.create(l0after, l1after); + } ++ } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org