Updated Branches: refs/heads/trunk 907a6ddf1 -> 9d58b7158
move long LCS test to long-test Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9d58b715 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9d58b715 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9d58b715 Branch: refs/heads/trunk Commit: 9d58b71583b4de7228a1cc106f02ecdad8778efd Parents: 907a6dd Author: Yuki Morishita <[email protected]> Authored: Thu Oct 11 09:51:00 2012 -0500 Committer: Yuki Morishita <[email protected]> Committed: Thu Oct 11 09:51:00 2012 -0500 ---------------------------------------------------------------------- .../LongLeveledCompactionStrategyTest.java | 129 +++++++++++++++ .../compaction/LeveledCompactionStrategyTest.java | 96 ----------- 2 files changed, 129 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d58b715/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java new file mode 100644 index 0000000..0ba9d7c --- /dev/null +++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; + +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RowMutation; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +public class LongLeveledCompactionStrategyTest extends SchemaLoader +{ + @Test + public void testParallelLeveledCompaction() throws Exception + { + String ksname = "Keyspace1"; + String cfname = "StandardLeveled"; + Table table = Table.open(ksname); + ColumnFamilyStore store = table.getColumnFamilyStore(cfname); + store.disableAutoCompaction(); + + LeveledCompactionStrategy lcs = (LeveledCompactionStrategy)store.getCompactionStrategy(); + + ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files + + // Enough data to have a level 1 and 2 + int rows = 128; + int columns = 10; + + // Adds enough data to trigger multiple sstable per level + for (int r = 0; r < rows; r++) + { + DecoratedKey key = Util.dk(String.valueOf(r)); + RowMutation rm = new RowMutation(ksname, key.key); + for (int c = 0; c < columns; c++) + { + rm.add(new QueryPath(cfname, null, ByteBufferUtil.bytes("column" + c)), value, 0); + } + rm.apply(); + store.forceBlockingFlush(); + } + + // Execute LCS in parallel + ExecutorService executor = new ThreadPoolExecutor(4, 4, + Long.MAX_VALUE, TimeUnit.SECONDS, + new LinkedBlockingDeque<Runnable>()); + List<Runnable> tasks = new ArrayList<Runnable>(); + while (true) + { + while (true) + { + final AbstractCompactionTask t = lcs.getMaximalTask(Integer.MIN_VALUE); + if (t == null) + break; + tasks.add(new Runnable() + { + public void run() + { + try + { + t.execute(null); + } + finally + { + t.unmarkSSTables(); + } + } + }); + } + if (tasks.isEmpty()) + break; + + List<Future<?>> futures = new ArrayList<Future<?>>(tasks.size()); + for (Runnable r : tasks) + futures.add(executor.submit(r)); + FBUtilities.waitOnFutures(futures); + + tasks.clear(); + } + + // Assert all SSTables are lined up correctly. + LeveledManifest manifest = lcs.manifest; + int levels = manifest.getLevelCount(); + for (int level = 0; level < levels; level++) + { + List<SSTableReader> sstables = manifest.getLevel(level); + // score check + assert (double) SSTable.getTotalBytes(sstables) / manifest.maxBytesForLevel(level) < 1.00; + // overlap check for levels greater than 0 + if (level > 0) + { + for (SSTableReader sstable : sstables) + { + Set<SSTableReader> overlaps = LeveledManifest.overlapping(sstable, sstables); + assert overlaps.size() == 1 && overlaps.contains(sstable); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d58b715/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index f407873..9a246c5 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -18,11 +18,6 @@ package org.apache.cassandra.db.compaction; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.concurrent.*; import org.junit.Test; @@ -35,9 +30,6 @@ import org.apache.cassandra.db.Table; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.SSTable; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableScanner; import org.apache.cassandra.service.AntiEntropyService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -91,92 +83,4 @@ public class LeveledCompactionStrategyTest extends SchemaLoader AntiEntropyService.Validator validator = new AntiEntropyService.Validator(req); CompactionManager.instance.submitValidation(store, validator).get(); } - - @Test - public void testParallelLeveledCompaction() throws Exception - { - String ksname = "Keyspace1"; - String cfname = "StandardLeveled"; - Table table = Table.open(ksname); - ColumnFamilyStore store = table.getColumnFamilyStore(cfname); - store.disableAutoCompaction(); - - LeveledCompactionStrategy lcs = (LeveledCompactionStrategy)store.getCompactionStrategy(); - - ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files - - // Enough data to have a level 1 and 2 - int rows = 128; - int columns = 10; - - // Adds enough data to trigger multiple sstable per level - for (int r = 0; r < rows; r++) - { - DecoratedKey key = Util.dk(String.valueOf(r)); - RowMutation rm = new RowMutation(ksname, key.key); - for (int c = 0; c < columns; c++) - { - rm.add(new QueryPath(cfname, null, ByteBufferUtil.bytes("column" + c)), value, 0); - } - rm.apply(); - store.forceBlockingFlush(); - } - - // Execute LCS in parallel - ExecutorService executor = new ThreadPoolExecutor(4, 4, - Long.MAX_VALUE, TimeUnit.SECONDS, - new LinkedBlockingDeque<Runnable>()); - List<Runnable> tasks = new ArrayList<Runnable>(); - while (true) - { - while (true) - { - final AbstractCompactionTask t = lcs.getMaximalTask(Integer.MIN_VALUE); - if (t == null) - break; - tasks.add(new Runnable() - { - public void run() - { - try - { - t.execute(null); - } - finally - { - t.unmarkSSTables(); - } - } - }); - } - if (tasks.isEmpty()) - break; - - List<Future<?>> futures = new ArrayList<Future<?>>(tasks.size()); - for (Runnable r : tasks) - futures.add(executor.submit(r)); - FBUtilities.waitOnFutures(futures); - - tasks.clear(); - } - - // Assert all SSTables are lined up correctly. - LeveledManifest manifest = lcs.manifest; - int levels = manifest.getLevelCount(); - for (int level = 0; level < levels; level++) - { - List<SSTableReader> sstables = manifest.getLevel(level); - // score check - assert (double) SSTable.getTotalBytes(sstables) / manifest.maxBytesForLevel(level) < 1.00; - // overlap check for levels greater than 0 - if (level > 0) - { - for (SSTableReader sstable : sstables) - { - Set<SSTableReader> overlaps = LeveledManifest.overlapping(sstable, sstables); - assert overlaps.size() == 1 && overlaps.contains(sstable); - } - } - } - } }
