Repository: cassandra Updated Branches: refs/heads/trunk 86f51fd4f -> 910170c9d
http://git-wip-us.apache.org/repos/asf/cassandra/blob/910170c9/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java new file mode 100644 index 0000000..ac12491 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import com.google.common.primitives.Longs; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.db.Cell; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; +import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter; +import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter; +import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter; +import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.utils.ByteBufferUtil; +import static org.junit.Assert.assertEquals; + +public class CompactionAwareWriterTest +{ + private static String KEYSPACE1 = "CompactionAwareWriterTest"; + private static String CF = "Standard1"; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + SimpleStrategy.class, + KSMetaData.optsWithRF(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF)); + + } + @Test + public void testDefaultCompactionWriter() + { + Keyspace ks = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = ks.getColumnFamilyStore(CF); + int rowCount = 1000; + cfs.disableAutoCompaction(); + populate(cfs, rowCount); + Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); + long beforeSize = sstables.iterator().next().onDiskLength(); + CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, sstables, sstables, false, OperationType.COMPACTION); + int rows = compact(cfs, sstables, writer); + assertEquals(1, cfs.getSSTables().size()); + assertEquals(rowCount, rows); + assertEquals(beforeSize, cfs.getSSTables().iterator().next().onDiskLength()); + validateData(cfs, rowCount); + cfs.truncateBlocking(); + } + + @Test + public void testMaxSSTableSizeWriter() + { + Keyspace ks = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = ks.getColumnFamilyStore(CF); + cfs.disableAutoCompaction(); + int rowCount = 1000; + populate(cfs, rowCount); + Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); + long beforeSize = sstables.iterator().next().onDiskLength(); + int sstableCount = (int)beforeSize/10; + CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, sstables, sstables, sstableCount, 0, false, OperationType.COMPACTION); + int rows = compact(cfs, sstables, writer); + assertEquals(10, cfs.getSSTables().size()); + assertEquals(rowCount, rows); + validateData(cfs, rowCount); + cfs.truncateBlocking(); + } + @Test + public void testSplittingSizeTieredCompactionWriter() + { + Keyspace ks = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = ks.getColumnFamilyStore(CF); + cfs.disableAutoCompaction(); + int rowCount = 10000; + populate(cfs, rowCount); + Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); + long beforeSize = sstables.iterator().next().onDiskLength(); + CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, sstables, sstables, OperationType.COMPACTION, 0); + int rows = compact(cfs, sstables, writer); + long expectedSize = beforeSize / 2; + List<SSTableReader> sortedSSTables = new ArrayList<>(cfs.getSSTables()); + + Collections.sort(sortedSSTables, new Comparator<SSTableReader>() + { + @Override + public int compare(SSTableReader o1, SSTableReader o2) + { + return Longs.compare(o2.onDiskLength(), o1.onDiskLength()); + } + }); + for (SSTableReader sstable : sortedSSTables) + { + assertEquals(expectedSize, sstable.onDiskLength(), 10000); + expectedSize /= 2; + } + assertEquals(rowCount, rows); + validateData(cfs, rowCount); + cfs.truncateBlocking(); + } + + @Test + public void testMajorLeveledCompactionWriter() + { + Keyspace ks = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = ks.getColumnFamilyStore(CF); + cfs.disableAutoCompaction(); + int rowCount = 10000; + populate(cfs, rowCount); + Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); + long beforeSize = sstables.iterator().next().onDiskLength(); + int sstableCount = (int)beforeSize/100; + CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, sstables, sstables, sstableCount, false, OperationType.COMPACTION); + int rows = compact(cfs, sstables, writer); + assertEquals(100, cfs.getSSTables().size()); + int [] levelCounts = new int[5]; + assertEquals(rowCount, rows); + for (SSTableReader sstable : cfs.getSSTables()) + { + levelCounts[sstable.getSSTableLevel()]++; + } + assertEquals(0, levelCounts[0]); + assertEquals(10, levelCounts[1]); + assertEquals(90, levelCounts[2]); + for (int i = 3; i < levelCounts.length; i++) + assertEquals(0, levelCounts[i]); + validateData(cfs, rowCount); + cfs.truncateBlocking(); + } + + private int compact(ColumnFamilyStore cfs, Set<SSTableReader> sstables, CompactionAwareWriter writer) + { + assert sstables.size() == 1; + int rowsWritten = 0; + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables)) + { + CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); + ISSTableScanner scanner = scanners.scanners.get(0); + while(scanner.hasNext()) + { + AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next())); + if (writer.append(row)) + rowsWritten++; + } + } + Collection<SSTableReader> newSSTables = writer.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newSSTables, OperationType.COMPACTION); + return rowsWritten; + } + + private void populate(ColumnFamilyStore cfs, int count) + { + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < count; i++) + { + DecoratedKey key = Util.dk(Integer.toString(i)); + Mutation rm = new Mutation(KEYSPACE1, key.getKey()); + for (int j = 0; j < 10; j++) + rm.add(CF, Util.cellname(Integer.toString(j)), + ByteBufferUtil.EMPTY_BYTE_BUFFER, + timestamp); + rm.applyUnsafe(); + } + cfs.forceBlockingFlush(); + assert cfs.getSSTables().size() == 1 : cfs.getSSTables(); + } + private void validateData(ColumnFamilyStore cfs, int rowCount) + { + for (int i = 0; i < rowCount; i++) + { + ColumnFamily cf = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(Util.dk(Integer.toString(i)), CF, System.currentTimeMillis()), Integer.MAX_VALUE); + Iterator<Cell> iter = cf.iterator(); + int cellCount = 0; + while (iter.hasNext()) + { + Cell c = iter.next(); + assertEquals(Util.cellname(Integer.toString(cellCount)), c.name()); + cellCount++; + } + assertEquals(10, cellCount); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/910170c9/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java index 6db01d7..e5baab6 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java @@ -126,7 +126,7 @@ public class CompactionsPurgeTest cfs.forceBlockingFlush(); // major compact and test that all columns but the resurrected one is completely gone - FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, Integer.MAX_VALUE)); + FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, Integer.MAX_VALUE, false)); cfs.invalidateCachedRow(key); ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis())); assertColumns(cf, "5"); @@ -387,7 +387,7 @@ public class CompactionsPurgeTest assertEquals(0, result.size()); // compact the two sstables with a gcBefore that does *not* allow the row tombstone to be purged - FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) - 10000)); + FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) - 10000, false)); // the data should be gone, but the tombstone should still exist assertEquals(1, cfs.getSSTables().size()); @@ -407,7 +407,7 @@ public class CompactionsPurgeTest cfs.forceBlockingFlush(); // compact the two sstables with a gcBefore that *does* allow the row tombstone to be purged - FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) + 10000)); + FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) + 10000, false)); // both the data and the tombstone should be gone this time assertEquals(0, cfs.getSSTables().size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/910170c9/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index b649ed6..c0417c0 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -180,7 +180,7 @@ public class CompactionsTest rm.applyUnsafe(); cfs.forceBlockingFlush(); - CompactionManager.instance.performMaximal(cfs); + CompactionManager.instance.performMaximal(cfs, false); assertEquals(1, cfs.getSSTables().size()); // check that the shadowed column is gone http://git-wip-us.apache.org/repos/asf/cassandra/blob/910170c9/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java index a2c4191..ec5c280 100644 --- a/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java @@ -76,7 +76,7 @@ public class OneCompactionTest store.forceBlockingFlush(); assertEquals(inserted.size(), Util.getRangeSlice(store).size()); } - CompactionManager.instance.performMaximal(store); + CompactionManager.instance.performMaximal(store, false); assertEquals(1, store.getSSTables().size()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/910170c9/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index c97cb71..13bdee9 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -122,7 +122,7 @@ public class SSTableReaderTest rm.applyUnsafe(); } store.forceBlockingFlush(); - CompactionManager.instance.performMaximal(store); + CompactionManager.instance.performMaximal(store, false); List<Range<Token>> ranges = new ArrayList<Range<Token>>(); // 1 key @@ -163,7 +163,7 @@ public class SSTableReaderTest rm.applyUnsafe(); } store.forceBlockingFlush(); - CompactionManager.instance.performMaximal(store); + CompactionManager.instance.performMaximal(store, false); // check that all our keys are found correctly SSTableReader sstable = store.getSSTables().iterator().next(); @@ -254,7 +254,7 @@ public class SSTableReaderTest rm.applyUnsafe(); } store.forceBlockingFlush(); - CompactionManager.instance.performMaximal(store); + CompactionManager.instance.performMaximal(store, false); SSTableReader sstable = store.getSSTables().iterator().next(); long p2 = sstable.getPosition(k(2), SSTableReader.Operator.EQ).position; @@ -302,7 +302,7 @@ public class SSTableReaderTest rm.apply(); } store.forceBlockingFlush(); - CompactionManager.instance.performMaximal(store); + CompactionManager.instance.performMaximal(store, false); SSTableReader sstable = store.getSSTables().iterator().next(); sstable.getPosition(k(2), SSTableReader.Operator.EQ); @@ -426,7 +426,7 @@ public class SSTableReaderTest rm.applyUnsafe(); } store.forceBlockingFlush(); - CompactionManager.instance.performMaximal(store); + CompactionManager.instance.performMaximal(store, false); // construct a range which is present in the sstable, but whose // keys are not found in the first segment of the index. @@ -463,7 +463,7 @@ public class SSTableReaderTest rm.applyUnsafe(); } store.forceBlockingFlush(); - CompactionManager.instance.performMaximal(store); + CompactionManager.instance.performMaximal(store, false); Collection<SSTableReader> sstables = store.getSSTables(); assert sstables.size() == 1;
