This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e53ad6461224b1ab096f56d9e2c744126eb532cd
Merge: f0bb299 b58a5c8
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Thu Feb 17 10:30:05 2022 +0100

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 .../cassandra/db/compaction/CompactionTask.java    |   6 +-
 .../db/compaction/LeveledCompactionTask.java       |  45 +++++-
 .../compaction/writers/CompactionAwareWriter.java  |   7 +-
 .../writers/MajorLeveledCompactionWriter.java      |   6 +
 .../compaction/writers/MaxSSTableSizeWriter.java   |   6 +
 .../SplittingSizeTieredCompactionWriter.java       |   8 +-
 .../compaction/LeveledCompactionStrategyTest.java  | 152 +++++++++++++++++++++
 .../{ => writers}/CompactionAwareWriterTest.java   |  48 ++++++-
 9 files changed, 267 insertions(+), 12 deletions(-)

diff --cc CHANGES.txt
index 402048f,4d07a3d..513a8af
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,9 +1,22 @@@
 -3.0.27
 - * LeveledCompactionStrategy disk space check improvements (CASSANDRA-17272)
 +3.11.13
 +Merged from 3.0:
   * Lazy transaction log replica creation allows incorrect replica content 
divergence during anticompaction (CASSANDRA-17273)
++ * LeveledCompactionStrategy disk space check improvements (CASSANDRA-17272)
  
  
 -3.0.26
 +3.11.12
 + * Upgrade snakeyaml to 1.26 in 3.11 (CASSANDRA=17028)
 + * Add key validation to ssstablescrub (CASSANDRA-16969)
 + * Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Make assassinate more resilient to missing tokens (CASSANDRA-16847)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies 
(CASSANDRA-16854)
 + * Validate SASI tokenizer options before adding index to schema 
(CASSANDRA-15135)
 + * Fixup scrub output when no data post-scrub and clear up old use of row, 
which really means partition (CASSANDRA-16835)
 + * Fix ant-junit dependency issue (CASSANDRA-16827)
 + * Reduce thread contention in CommitLogSegment and HintsBuffer 
(CASSANDRA-16072)
 + * Avoid sending CDC column if not enabled (CASSANDRA-16770)
 +Merged from 3.0:
   * Fix conversion from megabits to bytes in streaming rate limiter 
(CASSANDRA-17243)
   * Upgrade logback to 1.2.9 (CASSANDRA-17204)
   * Avoid race in AbstractReplicationStrategy endpoint caching 
(CASSANDRA-16673)
diff --cc 
src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index bc6115e,1ceed1c..7ffe33a
--- 
a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@@ -158,26 -124,9 +158,26 @@@ public abstract class CompactionAwareWr
       */
      protected void maybeSwitchWriter(DecoratedKey key)
      {
 -        if (!isInitialized)
 -            
switchCompactionLocation(getDirectories().getWriteableLocation(getExpectedWriteSize()));
 -        isInitialized = true;
 +        if (diskBoundaries == null)
 +        {
 +            if (locationIndex < 0)
 +            {
-                 Directories.DataDirectory defaultLocation = 
getWriteDirectory(nonExpiredSSTables, 
cfs.getExpectedCompactedFileSize(nonExpiredSSTables, OperationType.UNKNOWN));
++                Directories.DataDirectory defaultLocation = 
getWriteDirectory(nonExpiredSSTables, getExpectedWriteSize());
 +                switchCompactionLocation(defaultLocation);
 +                locationIndex = 0;
 +            }
 +            return;
 +        }
 +
 +        if (locationIndex > -1 && 
key.compareTo(diskBoundaries.get(locationIndex)) < 0)
 +            return;
 +
 +        int prevIdx = locationIndex;
 +        while (locationIndex == -1 || 
key.compareTo(diskBoundaries.get(locationIndex)) > 0)
 +            locationIndex++;
 +        if (prevIdx >= 0)
 +            logger.debug("Switching write location from {} to {}", 
locations.get(prevIdx), locations.get(locationIndex));
 +        switchCompactionLocation(locations.get(locationIndex));
      }
  
      /**
@@@ -198,45 -147,18 +198,50 @@@
  
      /**
       * Return a directory where we can expect expectedWriteSize to fit.
 +     *
 +     * @param sstables the sstables to compact
 +     * @return
       */
 -    public Directories.DataDirectory getWriteDirectory(long expectedWriteSize)
 +    public Directories.DataDirectory 
getWriteDirectory(Iterable<SSTableReader> sstables, long estimatedWriteSize)
      {
 -        Directories.DataDirectory directory = 
getDirectories().getWriteableLocation(expectedWriteSize);
 -        if (directory == null)
 -            throw new RuntimeException("Insufficient disk space to write " + 
expectedWriteSize + " bytes");
 +        Descriptor descriptor = null;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (descriptor == null)
 +                descriptor = sstable.descriptor;
 +            if (!descriptor.directory.equals(sstable.descriptor.directory))
 +            {
 +                logger.trace("All sstables not from the same disk - putting 
results in {}", descriptor.directory);
 +                break;
 +            }
 +        }
 +        Directories.DataDirectory d = 
getDirectories().getDataDirectoryForFile(descriptor);
 +        if (d != null)
 +        {
 +            long availableSpace = d.getAvailableSpace();
 +            if (availableSpace < estimatedWriteSize)
 +                throw new RuntimeException(String.format("Not enough space to 
write %s to %s (%s available)",
 +                                                         
FBUtilities.prettyPrintMemory(estimatedWriteSize),
 +                                                         d.location,
 +                                                         
FBUtilities.prettyPrintMemory(availableSpace)));
 +            logger.trace("putting compaction results in {}", 
descriptor.directory);
 +            return d;
 +        }
 +        d = getDirectories().getWriteableLocation(estimatedWriteSize);
 +        if (d == null)
 +            throw new RuntimeException(String.format("Not enough disk space 
to store %s",
 +                                                     
FBUtilities.prettyPrintMemory(estimatedWriteSize)));
 +        return d;
 +    }
  
 -        return directory;
 +    public CompactionAwareWriter setRepairedAt(long repairedAt)
 +    {
 +        this.sstableWriter.setRepairedAt(repairedAt);
 +        return this;
      }
+ 
+     protected long getExpectedWriteSize()
+     {
+         return cfs.getExpectedCompactedFileSize(nonExpiredSSTables, 
txn.opType());
+     }
  }
diff --cc 
src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 0beb505,3eee398..f1326e9
--- 
a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@@ -100,21 -96,23 +100,27 @@@ public class MajorLeveledCompactionWrit
  
      }
  
 -    public void switchCompactionLocation(Directories.DataDirectory directory)
 +    @Override
 +    public void switchCompactionLocation(Directories.DataDirectory location)
      {
 -        File sstableDirectory = 
getDirectories().getLocationForDisk(directory);
 -        @SuppressWarnings("resource")
 -        SSTableWriter writer = 
SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
 -                                                    
averageEstimatedKeysPerSSTable,
 -                                                    minRepairedAt,
 -                                                    cfs.metadata,
 -                                                    new 
MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
 -                                                    
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
 -                                                    txn);
 -        sstableWriter.switchWriter(writer);
 +        this.sstableDirectory = location;
 +        averageEstimatedKeysPerSSTable = Math.round(((double) 
averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / 
(sstablesWritten + 1));
 +        
sstableWriter.switchWriter(SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(sstableDirectory))),
 +                keysPerSSTable,
 +                minRepairedAt,
 +                cfs.metadata,
 +                new MetadataCollector(txn.originals(), 
cfs.metadata.comparator, currentLevel),
 +                SerializationHeader.make(cfs.metadata, txn.originals()),
 +                cfs.indexManager.listIndexes(),
 +                txn));
 +        partitionsWritten = 0;
 +        sstablesWritten = 0;
 +
      }
+ 
+     @Override
+     protected long getExpectedWriteSize()
+     {
 -        return expectedWriteSize;
++        return Math.min(maxSSTableSize, super.getExpectedWriteSize());
+     }
  }
diff --cc 
src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 864185e,d76381a..36c69b0
--- 
a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@@ -118,5 -103,12 +118,11 @@@ public class MaxSSTableSizeWriter exten
                                                      txn);
  
          sstableWriter.switchWriter(writer);
 -
      }
+ 
+     @Override
+     protected long getExpectedWriteSize()
+     {
 -        return expectedWriteSize;
++        return Math.min(maxSSTableSize, super.getExpectedWriteSize());
+     }
  }
diff --cc 
src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 46cb891,77672d8..b4d7097
--- 
a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@@ -88,12 -92,11 +88,12 @@@ public class SplittingSizeTieredCompact
      public boolean realAppend(UnfilteredRowIterator partition)
      {
          RowIndexEntry rie = sstableWriter.append(partition);
 -        if (sstableWriter.currentWriter().getOnDiskFilePointer() > 
currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we 
underestimate how many keys we have, the last sstable might get more than we 
expect
 +        if (sstableWriter.currentWriter().getEstimatedOnDiskBytesWritten() > 
currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we 
underestimate how many keys we have, the last sstable might get more than we 
expect
          {
              currentRatioIndex++;
-             currentBytesToWrite = Math.round(totalSize * 
ratios[currentRatioIndex]);
+             currentBytesToWrite = getExpectedWriteSize();
 -            switchCompactionLocation(getWriteDirectory(currentBytesToWrite));
 +            switchCompactionLocation(location);
 +            logger.debug("Switching writer, currentBytesToWrite = {}", 
currentBytesToWrite);
          }
          return rie != null;
      }
@@@ -114,5 -114,11 +114,11 @@@
                                                      txn);
          logger.trace("Switching writer, currentPartitionsToWrite = {}", 
currentPartitionsToWrite);
          sstableWriter.switchWriter(writer);
 -
      }
+ 
++    @Override
+     protected long getExpectedWriteSize()
+     {
+         return Math.round(totalSize * ratios[currentRatioIndex]);
+     }
  }
diff --cc 
test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index eba243f,5bbc931..91b9b3b
--- 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@@ -28,16 -29,17 +29,21 @@@ import java.util.Iterator
  import java.util.List;
  import java.util.Map;
  import java.util.Random;
+ import java.util.Set;
  import java.util.UUID;
 +import java.util.stream.Collectors;
  
 -import com.google.common.collect.Iterables;
 -import com.google.common.collect.Sets;
 +import junit.framework.Assert;
  import org.junit.After;
 -import org.junit.Assert;
  import org.junit.Before;
  import org.junit.BeforeClass;
++
++import com.google.common.collect.Iterables;
++import com.google.common.collect.Sets;
++import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.junit.Test;
  import org.junit.runner.RunWith;
 +
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -62,11 -64,12 +68,14 @@@ import org.apache.cassandra.schema.Comp
  import org.apache.cassandra.schema.KeyspaceParams;
  import org.apache.cassandra.service.ActiveRepairService;
  import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.Pair;
  
 +import static java.util.Collections.singleton;
 +import static org.assertj.core.api.Assertions.assertThat;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertNotNull;
+ import static org.junit.Assert.assertNull;
  import static org.junit.Assert.assertTrue;
  
  @RunWith(OrderedJUnit4ClassRunner.class)
@@@ -376,351 -380,146 +385,494 @@@ public class LeveledCompactionStrategyT
          assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
      }
  
 +
 +
 +    @Test
 +    public void testTokenRangeCompaction() throws Exception
 +    {
 +        // Remove any existing data so we can start out clean with 
predictable number of sstables
 +        cfs.truncateBlocking();
 +
 +        // Disable auto compaction so cassandra does not compact
 +        CompactionManager.instance.disableAutoCompaction();
 +
 +        ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB 
value, make it easy to have multiple files
 +
 +        DecoratedKey key1 = Util.dk(String.valueOf(1));
 +        DecoratedKey key2 = Util.dk(String.valueOf(2));
 +        List<DecoratedKey> keys = new ArrayList<>(Arrays.asList(key1, key2));
 +        int numIterations = 10;
 +        int columns = 2;
 +
 +        // Add enough data to trigger multiple sstables.
 +
 +        // create 10 sstables that contain data for both key1 and key2
 +        for (int i = 0; i < numIterations; i++) {
 +            for (DecoratedKey key : keys) {
 +                UpdateBuilder update = UpdateBuilder.create(cfs.metadata, 
key);
 +                for (int c = 0; c < columns; c++)
 +                    update.newRow("column" + c).add("val", value);
 +                update.applyUnsafe();
 +            }
 +            cfs.forceBlockingFlush();
 +        }
 +
 +        // create 20 more sstables with 10 containing data for key1 and other 
10 containing data for key2
 +        for (int i = 0; i < numIterations; i++) {
 +            for (DecoratedKey key : keys) {
 +                UpdateBuilder update = UpdateBuilder.create(cfs.metadata, 
key);
 +                for (int c = 0; c < columns; c++)
 +                    update.newRow("column" + c).add("val", value);
 +                update.applyUnsafe();
 +                cfs.forceBlockingFlush();
 +            }
 +        }
 +
 +        // We should have a total of 30 sstables by now
 +        assertEquals(30, cfs.getLiveSSTables().size());
 +
 +        // Compact just the tables with key2
 +        // Bit hackish to use the key1.token as the prior key but works in 
BytesToken
 +        Range<Token> tokenRange = new Range<>(key2.getToken(), 
key2.getToken());
 +        Collection<Range<Token>> tokenRanges = new 
ArrayList<>(Arrays.asList(tokenRange));
 +        cfs.forceCompactionForTokenRange(tokenRanges);
 +
 +        while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) {
 +            Thread.sleep(100);
 +        }
 +
 +        // 20 tables that have key2 should have been compacted in to 1 table 
resulting in 11 (30-20+1)
 +        assertEquals(11, cfs.getLiveSSTables().size());
 +
 +        // Compact just the tables with key1. At this point all 11 tables 
should have key1
 +        Range<Token> tokenRange2 = new Range<>(key1.getToken(), 
key1.getToken());
 +        Collection<Range<Token>> tokenRanges2 = new 
ArrayList<>(Arrays.asList(tokenRange2));
 +        cfs.forceCompactionForTokenRange(tokenRanges2);
 +
 +
 +        while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) {
 +            Thread.sleep(100);
 +        }
 +
 +        // the 11 tables containing key1 should all compact to 1 table
 +        assertEquals(1, cfs.getLiveSSTables().size());
 +    }
 +
 +    @Test
 +    public void testCompactionCandidateOrdering() throws Exception
 +    {
 +        // add some data
 +        byte [] b = new byte[100 * 1024];
 +        new Random().nextBytes(b);
 +        ByteBuffer value = ByteBuffer.wrap(b);
 +        int rows = 4;
 +        int columns = 10;
 +        // Just keep sstables in L0 for this test
 +        cfs.disableAutoCompaction();
 +        for (int r = 0; r < rows; r++)
 +        {
 +            UpdateBuilder update = UpdateBuilder.create(cfs.metadata, 
String.valueOf(r));
 +            for (int c = 0; c < columns; c++)
 +                update.newRow("column" + c).add("val", value);
 +            update.applyUnsafe();
 +            cfs.forceBlockingFlush();
 +        }
 +        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) 
(cfs.getCompactionStrategyManager()).getStrategies().get(1).get(0);
 +        // get readers for level 0 sstables
 +        Collection<SSTableReader> sstables = strategy.manifest.getLevel(0);
 +        Collection<SSTableReader> sortedCandidates = 
strategy.manifest.ageSortedSSTables(sstables);
 +        assertTrue(String.format("More than 1 sstable required for test, 
found: %d .", sortedCandidates.size()), sortedCandidates.size() > 1);
 +        long lastMaxTimeStamp = Long.MIN_VALUE;
 +        for (SSTableReader sstable : sortedCandidates)
 +        {
 +            assertTrue(String.format("SStables not sorted into oldest to 
newest by maxTimestamp. Current sstable: %d , last sstable: %d", 
sstable.getMaxTimestamp(), lastMaxTimeStamp),
 +                       sstable.getMaxTimestamp() > lastMaxTimeStamp);
 +            lastMaxTimeStamp = sstable.getMaxTimestamp();
 +        }
 +    }
 +
 +    @Test
 +    public void testAddingOverlapping()
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new 
SizeTieredCompactionStrategyOptions());
 +        List<SSTableReader> currentLevel = new ArrayList<>();
 +        int gen = 1;
 +        currentLevel.add(MockSchema.sstableWithLevel(gen++, 10, 20, 1, cfs));
 +        currentLevel.add(MockSchema.sstableWithLevel(gen++, 21, 30, 1, cfs));
 +        currentLevel.add(MockSchema.sstableWithLevel(gen++, 51, 100, 1, cfs));
 +        currentLevel.add(MockSchema.sstableWithLevel(gen++, 80, 120, 1, cfs));
 +        currentLevel.add(MockSchema.sstableWithLevel(gen++, 90, 150, 1, cfs));
 +
 +        lm.addSSTables(currentLevel);
 +        assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
 +        assertLevelsEqual(lm.getLevel(0), currentLevel.subList(3, 5));
 +
 +        List<SSTableReader> newSSTables = new ArrayList<>();
 +        // this sstable last token is the same as the first token of L1 
above, should get sent to L0:
 +        newSSTables.add(MockSchema.sstableWithLevel(gen++, 5, 10, 1, cfs));
 +        lm.addSSTables(newSSTables);
 +        assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
 +        assertEquals(0, newSSTables.get(0).getSSTableLevel());
 +        assertTrue(lm.getLevel(0).containsAll(newSSTables));
 +
 +        newSSTables.clear();
 +        newSSTables.add(MockSchema.sstableWithLevel(gen++, 30, 40, 1, cfs));
 +        lm.addSSTables(newSSTables);
 +        assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
 +        assertEquals(0, newSSTables.get(0).getSSTableLevel());
 +        assertTrue(lm.getLevel(0).containsAll(newSSTables));
 +
 +        newSSTables.clear();
 +        newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs));
 +        lm.addSSTables(newSSTables);
 +        assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
 +        assertEquals(0, newSSTables.get(0).getSSTableLevel());
 +        assertTrue(lm.getLevel(0).containsAll(newSSTables));
 +
 +        newSSTables.clear();
 +        newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs));
 +        newSSTables.add(MockSchema.sstableWithLevel(gen++, 120, 140, 1, cfs));
 +        lm.addSSTables(newSSTables);
 +        List<SSTableReader> newL1 = new ArrayList<>(currentLevel.subList(0, 
3));
 +        newL1.add(newSSTables.get(1));
 +        assertLevelsEqual(lm.getLevel(1), newL1);
 +        newSSTables.remove(1);
 +        assertTrue(newSSTables.stream().allMatch(s -> s.getSSTableLevel() == 
0));
 +        assertTrue(lm.getLevel(0).containsAll(newSSTables));
 +    }
 +
 +    @Test
 +    public void singleTokenSSTableTest()
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new 
SizeTieredCompactionStrategyOptions());
 +        List<SSTableReader> expectedL1 = new ArrayList<>();
 +
 +        int gen = 1;
 +        // single sstable, single token (100)
 +        expectedL1.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs));
 +        lm.addSSTables(expectedL1);
 +
 +        List<SSTableReader> expectedL0 = new ArrayList<>();
 +
 +        // should get moved to L0:
 +        expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 101, 1, cfs));
 +        expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 101, 1, cfs));
 +        expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 100, 1, cfs));
 +        expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs));
 +        lm.addSSTables(expectedL0);
 +
 +        assertLevelsEqual(expectedL0, lm.getLevel(0));
 +        assertTrue(expectedL0.stream().allMatch(s -> s.getSSTableLevel() == 
0));
 +        assertLevelsEqual(expectedL1, lm.getLevel(1));
 +        assertTrue(expectedL1.stream().allMatch(s -> s.getSSTableLevel() == 
1));
 +
 +        // should work:
 +        expectedL1.add(MockSchema.sstableWithLevel(gen++, 98, 99, 1, cfs));
 +        expectedL1.add(MockSchema.sstableWithLevel(gen++, 101, 101, 1, cfs));
 +        lm.addSSTables(expectedL1.subList(1, expectedL1.size()));
 +        assertLevelsEqual(expectedL1, lm.getLevel(1));
 +    }
 +
 +    @Test
 +    public void randomMultiLevelAddTest()
 +    {
 +        int iterations = 100;
 +        int levelCount = 9;
 +
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new 
SizeTieredCompactionStrategyOptions());
 +        long seed = System.currentTimeMillis();
 +        Random r = new Random(seed);
 +        List<SSTableReader> newLevels = generateNewRandomLevels(cfs, 40, 
levelCount, 0, r);
 +
 +        int sstableCount = newLevels.size();
 +        lm.addSSTables(newLevels);
 +
 +        int [] expectedLevelSizes = lm.getAllLevelSize();
 +
 +        for (int j = 0; j < iterations; j++)
 +        {
 +            newLevels = generateNewRandomLevels(cfs, 20, levelCount, 
sstableCount, r);
 +            sstableCount += newLevels.size();
 +
 +            int[] canAdd = canAdd(lm, newLevels, levelCount);
 +            for (int i = 0; i < levelCount; i++)
 +                expectedLevelSizes[i] += canAdd[i];
 +            lm.addSSTables(newLevels);
 +        }
 +
 +        // and verify no levels overlap
 +        int actualSSTableCount = 0;
 +        for (int i = 0; i < levelCount; i++)
 +        {
 +            actualSSTableCount += lm.getLevelSize(i);
 +            List<SSTableReader> level = new ArrayList<>(lm.getLevel(i));
 +            int lvl = i;
 +            assertTrue(level.stream().allMatch(s -> s.getSSTableLevel() == 
lvl));
 +            if (i > 0)
 +            {
 +                level.sort(SSTableReader.sstableComparator);
 +                SSTableReader prev = null;
 +                for (SSTableReader sstable : level)
 +                {
 +                    if (prev != null && sstable.first.compareTo(prev.last) <= 
0)
 +                    {
 +                        String levelStr = level.stream().map(s -> 
String.format("[%s, %s]", s.first, s.last)).collect(Collectors.joining(", "));
 +                        String overlap = String.format("sstable [%s, %s] 
overlaps with [%s, %s] in level %d (%s) ", sstable.first, sstable.last, 
prev.first, prev.last, i, levelStr);
 +                        Assert.fail("[seed = "+seed+"] overlap in level 
"+lvl+": " + overlap);
 +                    }
 +                    prev = sstable;
 +                }
 +            }
 +        }
 +        assertEquals(sstableCount, actualSSTableCount);
 +        for (int i = 0; i < levelCount; i++)
 +            assertEquals("[seed = " + seed + "] wrong sstable count in level 
= " + i, expectedLevelSizes[i], lm.getLevel(i).size());
 +    }
 +
 +    private static List<SSTableReader> 
generateNewRandomLevels(ColumnFamilyStore cfs, int maxSSTableCountPerLevel, int 
levelCount, int startGen, Random r)
 +    {
 +        List<SSTableReader> newLevels = new ArrayList<>();
 +        for (int level = 0; level < levelCount; level++)
 +        {
 +            int numLevelSSTables = r.nextInt(maxSSTableCountPerLevel) + 1;
 +            List<Integer> tokens = new ArrayList<>(numLevelSSTables * 2);
 +
 +            for (int i = 0; i < numLevelSSTables * 2; i++)
 +                tokens.add(r.nextInt(4000));
 +            Collections.sort(tokens);
 +            for (int i = 0; i < tokens.size() - 1; i += 2)
 +            {
 +                SSTableReader sstable = 
MockSchema.sstableWithLevel(++startGen, tokens.get(i), tokens.get(i + 1), 
level, cfs);
 +                newLevels.add(sstable);
 +            }
 +        }
 +        return newLevels;
 +    }
 +
 +    /**
 +     * brute-force checks if the new sstables can be added to the correct 
level in manifest
 +     *
 +     * @return count of expected sstables to add to each level
 +     */
 +    private static int[] canAdd(LeveledManifest lm, List<SSTableReader> 
newSSTables, int levelCount)
 +    {
 +        Map<Integer, Collection<SSTableReader>> sstableGroups = new 
HashMap<>();
 +        newSSTables.forEach(s -> 
sstableGroups.computeIfAbsent(s.getSSTableLevel(), k -> new 
ArrayList<>()).add(s));
 +
 +        int[] canAdd = new int[levelCount];
 +        for (Map.Entry<Integer, Collection<SSTableReader>> lvlGroup : 
sstableGroups.entrySet())
 +        {
 +            int level = lvlGroup.getKey();
 +            if (level == 0)
 +            {
 +                canAdd[0] += lvlGroup.getValue().size();
 +                continue;
 +            }
 +
 +            List<SSTableReader> newLevel = new 
ArrayList<>(lm.getLevel(level));
 +            for (SSTableReader sstable : lvlGroup.getValue())
 +            {
 +                newLevel.add(sstable);
 +                newLevel.sort(SSTableReader.sstableComparator);
 +
 +                SSTableReader prev = null;
 +                boolean kept = true;
 +                for (SSTableReader sst : newLevel)
 +                {
 +                    if (prev != null && prev.last.compareTo(sst.first) >= 0)
 +                    {
 +                        newLevel.remove(sstable);
 +                        kept = false;
 +                        break;
 +                    }
 +                    prev = sst;
 +                }
 +                if (kept)
 +                    canAdd[level] += 1;
 +                else
 +                    canAdd[0] += 1;
 +            }
 +        }
 +        return canAdd;
 +    }
 +
 +    private static void assertLevelsEqual(Collection<SSTableReader> l1, 
Collection<SSTableReader> l2)
 +    {
 +        assertEquals(l1.size(), l2.size());
 +        assertEquals(new HashSet<>(l1), new HashSet<>(l2));
 +    }
 +
 +    @Test
 +    public void testHighestLevelHasMoreDataThanSupported()
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        int fanoutSize = 2; // to generate less sstables
 +        LeveledManifest lm = new LeveledManifest(cfs, 1, fanoutSize, new 
SizeTieredCompactionStrategyOptions());
 +
 +        // generate data for L7 to trigger compaction
 +        int l7 = 7;
 +        int maxBytesForL7 = (int) (Math.pow(fanoutSize, l7) * 1024 * 1024);
 +        int sstablesSizeForL7 = (int) (maxBytesForL7 * 1.001) + 1;
 +        List<SSTableReader> sstablesOnL7 = 
Collections.singletonList(MockSchema.sstableWithLevel( 1, sstablesSizeForL7, 
l7, cfs));
 +        lm.addSSTables(sstablesOnL7);
 +
 +        // generate data for L8 to trigger compaction
 +        int l8 = 8;
 +        int maxBytesForL8 = (int) (Math.pow(fanoutSize, l8) * 1024 * 1024);
 +        int sstablesSizeForL8 = (int) (maxBytesForL8 * 1.001) + 1;
 +        List<SSTableReader> sstablesOnL8 = 
Collections.singletonList(MockSchema.sstableWithLevel( 2, sstablesSizeForL8, 
l8, cfs));
 +        lm.addSSTables(sstablesOnL8);
 +
 +        // compaction for L8 sstables is not supposed to be run because there 
is no upper level to promote sstables
 +        // that's why we expect compaction candidates for L7 only
 +        Collection<SSTableReader> compactionCandidates = 
lm.getCompactionCandidates().sstables;
 +        assertThat(compactionCandidates).containsAll(sstablesOnL7);
 +        
assertThat(compactionCandidates).doesNotContainAnyElementsOf(sstablesOnL8);
 +    }
++
+     @Test
+     public void testReduceScopeL0L1() throws IOException
+     {
+         ColumnFamilyStore cfs = MockSchema.newCFS();
+         Map<String, String> localOptions = new HashMap<>();
+         localOptions.put("class", "LeveledCompactionStrategy");
+         localOptions.put("sstable_size_in_mb", "1");
+         cfs.setCompactionParameters(localOptions);
+         List<SSTableReader> l1sstables = new ArrayList<>();
+         for (int i = 0; i < 10; i++)
+         {
+             SSTableReader l1sstable = MockSchema.sstable(i, 1 * 1024 * 1024, 
cfs);
+             
l1sstable.descriptor.getMetadataSerializer().mutateLevel(l1sstable.descriptor, 
1);
+             l1sstable.reloadSSTableMetadata();
+             l1sstables.add(l1sstable);
+         }
+         List<SSTableReader> l0sstables = new ArrayList<>();
+         for (int i = 10; i < 20; i++)
+             l0sstables.add(MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs));
 -
+         try (LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.COMPACTION, 
Iterables.concat(l0sstables, l1sstables)))
+         {
+             CompactionTask task = new LeveledCompactionTask(cfs, txn, 1, 0, 
1024*1024, false);
+             SSTableReader lastRemoved = null;
+             boolean removed = true;
+             for (int i = 0; i < l0sstables.size(); i++)
+             {
+                 Set<SSTableReader> before = new HashSet<>(txn.originals());
+                 removed = task.reduceScopeForLimitedSpace(0);
 -                SSTableReader removedSSTable = Sets.difference(before, 
txn.originals()).stream().findFirst().orElse(null);
++                SSTableReader removedSSTable = 
Iterables.getOnlyElement(Sets.difference(before, txn.originals()), null);
+                 if (removed)
+                 {
+                     assertNotNull(removedSSTable);
+                     assertTrue(lastRemoved == null || 
removedSSTable.onDiskLength() < lastRemoved.onDiskLength());
+                     assertEquals(0, removedSSTable.getSSTableLevel());
+                     Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = 
groupByLevel(txn.originals());
+                     Set<SSTableReader> l1after = sstables.right;
+ 
+                     assertEquals(l1after, new HashSet<>(l1sstables)); // we 
don't touch L1
+                     assertEquals(before.size() - 1, txn.originals().size());
+                     lastRemoved = removedSSTable;
+                 }
+                 else
+                 {
+                     assertNull(removedSSTable);
+                     Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = 
groupByLevel(txn.originals());
+                     Set<SSTableReader> l0after = sstables.left;
+                     Set<SSTableReader> l1after = sstables.right;
+                     assertEquals(l1after, new HashSet<>(l1sstables)); // we 
don't touch L1
+                     assertEquals(1, l0after.size()); // and we stop reducing 
once there is a single sstable left
+                 }
+             }
+             assertFalse(removed);
+         }
+     }
+ 
+     @Test
+     public void testReduceScopeL0()
+     {
+ 
+         List<SSTableReader> l0sstables = new ArrayList<>();
+         for (int i = 10; i < 20; i++)
+             l0sstables.add(MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs));
+ 
+         try (LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.COMPACTION, l0sstables))
+         {
+             CompactionTask task = new LeveledCompactionTask(cfs, txn, 0, 0, 
1024*1024, false);
+ 
+             SSTableReader lastRemoved = null;
+             boolean removed = true;
+             for (int i = 0; i < l0sstables.size(); i++)
+             {
+                 Set<SSTableReader> before = new HashSet<>(txn.originals());
+                 removed = task.reduceScopeForLimitedSpace(0);
+                 SSTableReader removedSSTable = Sets.difference(before, 
txn.originals()).stream().findFirst().orElse(null);
+                 if (removed)
+                 {
+                     assertNotNull(removedSSTable);
+                     assertTrue(lastRemoved == null || 
removedSSTable.onDiskLength() < lastRemoved.onDiskLength());
+                     assertEquals(0, removedSSTable.getSSTableLevel());
+                     assertEquals(before.size() - 1, txn.originals().size());
+                     lastRemoved = removedSSTable;
+                 }
+                 else
+                 {
+                     assertNull(removedSSTable);
+                     Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = 
groupByLevel(txn.originals());
+                     Set<SSTableReader> l0after = sstables.left;
+                     assertEquals(1, l0after.size()); // and we stop reducing 
once there is a single sstable left
+                 }
+             }
+             assertFalse(removed);
+         }
+     }
+ 
+     @Test
+     public void testNoHighLevelReduction() throws IOException
+     {
+         List<SSTableReader> sstables = new ArrayList<>();
+         int i = 1;
+         for (; i < 5; i++)
+         {
+             SSTableReader sstable = MockSchema.sstable(i, (i + 1) * 1024 * 
1024, cfs);
+             
sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 1);
+             sstable.reloadSSTableMetadata();
+             sstables.add(sstable);
+         }
+         for (; i < 10; i++)
+         {
+             SSTableReader sstable = MockSchema.sstable(i, (i + 1) * 1024 * 
1024, cfs);
+             
sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 2);
+             sstable.reloadSSTableMetadata();
+             sstables.add(sstable);
+         }
+         try (LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.COMPACTION, sstables))
+         {
+             CompactionTask task = new LeveledCompactionTask(cfs, txn, 0, 0, 
1024 * 1024, false);
+             assertFalse(task.reduceScopeForLimitedSpace(0));
+             assertEquals(new HashSet<>(sstables), txn.originals());
+         }
+     }
+ 
+     private Pair<Set<SSTableReader>, Set<SSTableReader>> 
groupByLevel(Iterable<SSTableReader> sstables)
+     {
+         Set<SSTableReader> l1after = new HashSet<>();
+         Set<SSTableReader> l0after = new HashSet<>();
 -        for (SSTableReader kept : sstables)
++        for (SSTableReader sstable : sstables)
+         {
 -            switch (kept.getSSTableLevel())
++            switch (sstable.getSSTableLevel())
+             {
+                 case 0:
 -                    l0after.add(kept);
++                    l0after.add(sstable);
+                     break;
+                 case 1:
 -                    l1after.add(kept);
++                    l1after.add(sstable);
+                     break;
+                 default:
+                     throw new RuntimeException("only l0 & l1 sstables");
+             }
+         }
+         return Pair.create(l0after, l1after);
+     }
++
  }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to