http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/memory/HeapPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java b/src/java/org/apache/cassandra/utils/memory/HeapPool.java index 3d3ca09..19f81be 100644 --- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java +++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java @@ -22,7 +22,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import org.apache.cassandra.db.Cell; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -40,63 +39,66 @@ public class HeapPool extends MemtablePool public MemtableAllocator newAllocator() { - return new Allocator(this); + // TODO + throw new UnsupportedOperationException(); + //return new Allocator(this); } - public static class Allocator extends MemtableBufferAllocator - { - Allocator(HeapPool pool) - { - super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator()); - } + // TODO + //public static class Allocator extends MemtableBufferAllocator + //{ + // Allocator(HeapPool pool) + // { + // super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator()); + // } - public ByteBuffer allocate(int size, OpOrder.Group opGroup) - { - super.onHeap().allocate(size, opGroup); - return ByteBuffer.allocate(size); - } + // public ByteBuffer allocate(int size, OpOrder.Group opGroup) + // { + // super.onHeap().allocate(size, opGroup); + // return ByteBuffer.allocate(size); + // } - public DataReclaimer reclaimer() - { - return new Reclaimer(); - } + // public DataReclaimer reclaimer() + // { + // return new Reclaimer(); + // } - private class Reclaimer implements DataReclaimer - { - List<Cell> delayed; + // private class Reclaimer implements DataReclaimer + // { + // List<Cell> delayed; - public Reclaimer reclaim(Cell cell) - { - if (delayed == null) - delayed = new ArrayList<>(); - delayed.add(cell); - return this; - } + // public Reclaimer reclaim(Cell cell) + // { + // if (delayed == null) + // delayed = new ArrayList<>(); + // delayed.add(cell); + // return this; + // } - public Reclaimer reclaimImmediately(Cell cell) - { - onHeap().release(cell.name().dataSize() + cell.value().remaining()); - return this; - } + // public Reclaimer reclaimImmediately(Cell cell) + // { + // onHeap().release(cell.name().dataSize() + cell.value().remaining()); + // return this; + // } - public Reclaimer reclaimImmediately(DecoratedKey key) - { - onHeap().release(key.getKey().remaining()); - return this; - } + // public Reclaimer reclaimImmediately(DecoratedKey key) + // { + // onHeap().release(key.getKey().remaining()); + // return this; + // } - public void cancel() - { - if (delayed != null) - delayed.clear(); - } + // public void cancel() + // { + // if (delayed != null) + // delayed.clear(); + // } - public void commit() - { - if (delayed != null) - for (Cell cell : delayed) - reclaimImmediately(cell); - } - } - } + // public void commit() + // { + // if (delayed != null) + // for (Cell cell : delayed) + // reclaimImmediately(cell); + // } + // } + //} }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java index e814b4d..1e0c11e 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.WaitQueue; @@ -58,10 +59,8 @@ public abstract class MemtableAllocator this.offHeap = offHeap; } - public abstract Cell clone(Cell cell, CFMetaData cfm, OpOrder.Group writeOp); - public abstract CounterCell clone(CounterCell cell, CFMetaData cfm, OpOrder.Group writeOp); - public abstract DeletedCell clone(DeletedCell cell, CFMetaData cfm, OpOrder.Group writeOp); - public abstract ExpiringCell clone(ExpiringCell cell, CFMetaData cfm, OpOrder.Group writeOp); + public abstract MemtableRowData.ReusableRow newReusableRow(); + public abstract RowAllocator newRowAllocator(CFMetaData cfm, OpOrder.Group writeOp); public abstract DecoratedKey clone(DecoratedKey key, OpOrder.Group opGroup); public abstract DataReclaimer reclaimer(); @@ -104,10 +103,16 @@ public abstract class MemtableAllocator return state == LifeCycle.LIVE; } + public static interface RowAllocator extends Row.Writer + { + public void allocateNewRow(int clusteringSize, Columns columns, boolean isStatic); + public MemtableRowData allocatedRowData(); + } + public static interface DataReclaimer { - public DataReclaimer reclaim(Cell cell); - public DataReclaimer reclaimImmediately(Cell cell); + public DataReclaimer reclaim(MemtableRowData row); + public DataReclaimer reclaimImmediately(MemtableRowData row); public DataReclaimer reclaimImmediately(DecoratedKey key); public void cancel(); public void commit(); @@ -115,12 +120,12 @@ public abstract class MemtableAllocator public static final DataReclaimer NO_OP = new DataReclaimer() { - public DataReclaimer reclaim(Cell cell) + public DataReclaimer reclaim(MemtableRowData update) { return this; } - public DataReclaimer reclaimImmediately(Cell cell) + public DataReclaimer reclaimImmediately(MemtableRowData update) { return this; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java index 7034d76..144f439 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java @@ -20,12 +20,9 @@ package org.apache.cassandra.utils.memory; import java.nio.ByteBuffer; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.BufferDecoratedKey; -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.CounterCell; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.DeletedCell; -import org.apache.cassandra.db.ExpiringCell; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.utils.concurrent.OpOrder; public abstract class MemtableBufferAllocator extends MemtableAllocator @@ -36,24 +33,14 @@ public abstract class MemtableBufferAllocator extends MemtableAllocator super(onHeap, offHeap); } - public Cell clone(Cell cell, CFMetaData cfm, OpOrder.Group writeOp) + public MemtableRowData.ReusableRow newReusableRow() { - return cell.localCopy(cfm, allocator(writeOp)); + return MemtableRowData.BufferRowData.createReusableRow(); } - public CounterCell clone(CounterCell cell, CFMetaData cfm, OpOrder.Group writeOp) + public RowAllocator newRowAllocator(CFMetaData cfm, OpOrder.Group writeOp) { - return cell.localCopy(cfm, allocator(writeOp)); - } - - public DeletedCell clone(DeletedCell cell, CFMetaData cfm, OpOrder.Group writeOp) - { - return cell.localCopy(cfm, allocator(writeOp)); - } - - public ExpiringCell clone(ExpiringCell cell, CFMetaData cfm, OpOrder.Group writeOp) - { - return cell.localCopy(cfm, allocator(writeOp)); + return new RowBufferAllocator(allocator(writeOp), cfm.isCounter()); } public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp) @@ -67,4 +54,71 @@ public abstract class MemtableBufferAllocator extends MemtableAllocator { return new ContextAllocator(writeOp, this); } + + private static class RowBufferAllocator extends RowDataBlock.Writer implements RowAllocator + { + private final AbstractAllocator allocator; + private final boolean isCounter; + + private MemtableRowData.BufferClustering clustering; + private int clusteringIdx; + private LivenessInfo info; + private DeletionTime deletion; + private RowDataBlock data; + + private RowBufferAllocator(AbstractAllocator allocator, boolean isCounter) + { + super(true); + this.allocator = allocator; + this.isCounter = isCounter; + } + + public void allocateNewRow(int clusteringSize, Columns columns, boolean isStatic) + { + data = new RowDataBlock(columns, 1, false, isCounter); + clustering = isStatic ? null : new MemtableRowData.BufferClustering(clusteringSize); + clusteringIdx = 0; + updateWriter(data); + } + + public MemtableRowData allocatedRowData() + { + MemtableRowData row = new MemtableRowData.BufferRowData(clustering == null ? Clustering.STATIC_CLUSTERING : clustering, + info, + deletion, + data); + + clustering = null; + info = LivenessInfo.NONE; + deletion = DeletionTime.LIVE; + data = null; + + return row; + } + + public void writeClusteringValue(ByteBuffer value) + { + clustering.setClusteringValue(clusteringIdx++, value == null ? null : allocator.clone(value)); + } + + public void writePartitionKeyLivenessInfo(LivenessInfo info) + { + this.info = info; + } + + public void writeRowDeletion(DeletionTime deletion) + { + this.deletion = deletion; + } + + @Override + public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path) + { + ByteBuffer v = allocator.clone(value); + if (column.isComplex()) + complexWriter.addCell(column, v, info, MemtableRowData.BufferCellPath.clone(path, allocator)); + else + simpleWriter.addCell(column, v, info); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java index 88846c5..7ca859d 100644 --- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java +++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java @@ -25,16 +25,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.CounterCell; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.DeletedCell; -import org.apache.cassandra.db.ExpiringCell; -import org.apache.cassandra.db.NativeCell; -import org.apache.cassandra.db.NativeCounterCell; import org.apache.cassandra.db.NativeDecoratedKey; -import org.apache.cassandra.db.NativeDeletedCell; -import org.apache.cassandra.db.NativeExpiringCell; +import org.apache.cassandra.db.rows.MemtableRowData; import org.apache.cassandra.utils.concurrent.OpOrder; public class NativeAllocator extends MemtableAllocator @@ -60,28 +53,16 @@ public class NativeAllocator extends MemtableAllocator super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator()); } - @Override - public Cell clone(Cell cell, CFMetaData cfm, OpOrder.Group writeOp) - { - return new NativeCell(this, writeOp, cell); - } - - @Override - public CounterCell clone(CounterCell cell, CFMetaData cfm, OpOrder.Group writeOp) + public MemtableRowData.ReusableRow newReusableRow() { - return new NativeCounterCell(this, writeOp, cell); + // TODO + throw new UnsupportedOperationException(); } - @Override - public DeletedCell clone(DeletedCell cell, CFMetaData cfm, OpOrder.Group writeOp) - { - return new NativeDeletedCell(this, writeOp, cell); - } - - @Override - public ExpiringCell clone(ExpiringCell cell, CFMetaData cfm, OpOrder.Group writeOp) + public RowAllocator newRowAllocator(CFMetaData cfm, OpOrder.Group writeOp) { - return new NativeExpiringCell(this, writeOp, cell); + // TODO + throw new UnsupportedOperationException(); } public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/burn/org/apache/cassandra/utils/LongBTreeTest.java ---------------------------------------------------------------------- diff --git a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java deleted file mode 100644 index 9641930..0000000 --- a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java +++ /dev/null @@ -1,502 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cassandra.utils; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.NavigableMap; -import java.util.NavigableSet; -import java.util.Random; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import com.google.common.base.Function; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; -import org.junit.Assert; -import org.junit.Test; - - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Snapshot; -import com.codahale.metrics.Timer; -import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.utils.btree.BTree; -import org.apache.cassandra.utils.btree.BTreeSearchIterator; -import org.apache.cassandra.utils.btree.BTreeSet; -import org.apache.cassandra.utils.btree.UpdateFunction; - -// TODO : should probably lower fan-factor for tests to make them more intensive -public class LongBTreeTest -{ - - private static final MetricRegistry metrics = new MetricRegistry(); - private static final Timer BTREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "BTREE")); - private static final Timer TREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "TREE")); - private static final ExecutorService MODIFY = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("MODIFY")); - private static final ExecutorService COMPARE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("COMPARE")); - private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f); - - static - { - System.setProperty("cassandra.btree.fanfactor", "4"); - } - - @Test - public void testOversizedMiddleInsert() - { - TreeSet<Integer> canon = new TreeSet<>(); - for (int i = 0 ; i < 10000000 ; i++) - canon.add(i); - Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true, null); - btree = BTree.update(btree, ICMP, canon, true); - canon.add(Integer.MIN_VALUE); - canon.add(Integer.MAX_VALUE); - Assert.assertTrue(BTree.isWellFormed(btree, ICMP)); - testEqual("Oversize", BTree.<Integer>slice(btree, true), canon.iterator()); - } - - @Test - public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException - { - testInsertions(10000000, 50, 1, 1, true); - } - - @Test - public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException - { - testInsertions(10000000, 50, 1, 5, true); - } - - @Test - public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException - { - testInsertions(10000000, 500, 10, 1, true); - } - - @Test - public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException - { - testInsertions(10000000, 500, 10, 10, true); - } - - @Test - public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException - { - testInsertions(100000000, 5000, 3, 100, true); - } - - @Test - public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException - { - testInsertions(10000, 50, 10, 10, false); - } - - @Test - public void testSearchIterator() throws InterruptedException - { - int threads = Runtime.getRuntime().availableProcessors(); - final CountDownLatch latch = new CountDownLatch(threads); - final AtomicLong errors = new AtomicLong(); - final AtomicLong count = new AtomicLong(); - final int perThreadTrees = 100; - final int perTreeSelections = 100; - final long totalCount = threads * perThreadTrees * perTreeSelections; - for (int t = 0 ; t < threads ; t++) - { - MODIFY.execute(new Runnable() - { - public void run() - { - ThreadLocalRandom random = ThreadLocalRandom.current(); - for (int i = 0 ; i < perThreadTrees ; i++) - { - Object[] tree = randomTree(10000, random); - for (int j = 0 ; j < perTreeSelections ; j++) - { - BTreeSearchIterator<Integer, Integer, Integer> searchIterator = new BTreeSearchIterator<>(tree, ICMP); - for (Integer key : randomSelection(tree, random)) - if (key != searchIterator.next(key)) - errors.incrementAndGet(); - searchIterator = new BTreeSearchIterator<Integer, Integer, Integer>(tree, ICMP); - for (Integer key : randomMix(tree, random)) - if (key != searchIterator.next(key)) - if (BTree.find(tree, ICMP, key) == key) - errors.incrementAndGet(); - count.incrementAndGet(); - } - } - latch.countDown(); - } - }); - } - while (latch.getCount() > 0) - { - latch.await(10L, TimeUnit.SECONDS); - System.out.println(String.format("%.0f%% complete %s", 100 * count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : "")); - assert errors.get() == 0; - } - } - - private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException - { - int batchesPerTest = perTestCount / modificationBatchSize; - int maximumRunLength = 100; - int testKeyRange = perTestCount * testKeyRatio; - int tests = totalCount / perTestCount; - System.out.println(String.format("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops", - tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize)); - - // if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks - int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2)); - for (int chunk = 0 ; chunk < tests ; chunk += chunkSize) - { - final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>(); - for (int i = 0 ; i < chunkSize ; i++) - { - outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize, batchesPerTest, quickEquality)); - } - - final List<ListenableFuture<?>> inner = new ArrayList<>(); - int complete = 0; - int reportInterval = totalCount / 100; - int lastReportAt = 0; - for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer) - { - inner.addAll(f.get()); - complete += perTestCount; - if (complete - lastReportAt >= reportInterval) - { - System.out.println(String.format("Completed %d of %d operations", (chunk * perTestCount) + complete, totalCount)); - lastReportAt = complete; - } - } - Futures.allAsList(inner).get(); - } - Snapshot snap = BTREE_TIMER.getSnapshot(); - System.out.println(String.format("btree : %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile())); - snap = TREE_TIMER.getSnapshot(); - System.out.println(String.format("snaptree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile())); - System.out.println("Done"); - } - - private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality) - { - ListenableFutureTask<List<ListenableFuture<?>>> f = ListenableFutureTask.create(new Callable<List<ListenableFuture<?>>>() - { - @Override - public List<ListenableFuture<?>> call() - { - final List<ListenableFuture<?>> r = new ArrayList<>(); - NavigableMap<Integer, Integer> canon = new TreeMap<>(); - Object[] btree = BTree.empty(); - final TreeMap<Integer, Integer> buffer = new TreeMap<>(); - final Random rnd = new Random(); - for (int i = 0 ; i < iterations ; i++) - { - buffer.clear(); - int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration); - while (mods > 0) - { - int v = rnd.nextInt(upperBound); - int rc = Math.max(0, Math.min(mods, maxRunLength) - 1); - int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc)); - for (int j = 0 ; j < c ; j++) - { - buffer.put(v, v); - v++; - } - mods -= c; - } - Timer.Context ctxt; - ctxt = TREE_TIMER.time(); - canon.putAll(buffer); - ctxt.stop(); - ctxt = BTREE_TIMER.time(); - Object[] next = null; - while (next == null) - next = BTree.update(btree, ICMP, buffer.keySet(), true, SPORADIC_ABORT); - btree = next; - ctxt.stop(); - - if (!BTree.isWellFormed(btree, ICMP)) - { - System.out.println("ERROR: Not well formed"); - throw new AssertionError("Not well formed!"); - } - if (quickEquality) - testEqual("", BTree.<Integer>slice(btree, true), canon.keySet().iterator()); - else - r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet()))); - } - return r; - } - }); - MODIFY.execute(f); - return f; - } - - @Test - public void testSlicingAllSmallTrees() throws ExecutionException, InterruptedException - { - Object[] cur = BTree.empty(); - TreeSet<Integer> canon = new TreeSet<>(); - // we set FAN_FACTOR to 4, so 128 items is four levels deep, three fully populated - for (int i = 0 ; i < 128 ; i++) - { - String id = String.format("[0..%d)", canon.size()); - System.out.println("Testing " + id); - Futures.allAsList(testAllSlices(id, cur, canon)).get(); - Object[] next = null; - while (next == null) - next = BTree.update(cur, ICMP, Arrays.asList(i), true, SPORADIC_ABORT); - cur = next; - canon.add(i); - } - } - - static final Comparator<Integer> ICMP = new Comparator<Integer>() - { - @Override - public int compare(Integer o1, Integer o2) - { - return Integer.compare(o1, o2); - } - }; - - private static List<ListenableFuture<?>> testAllSlices(String id, Object[] btree, NavigableSet<Integer> canon) - { - List<ListenableFuture<?>> waitFor = new ArrayList<>(); - testAllSlices(id + " ASC", new BTreeSet<>(btree, ICMP), canon, true, waitFor); - testAllSlices(id + " DSC", new BTreeSet<>(btree, ICMP).descendingSet(), canon.descendingSet(), false, waitFor); - return waitFor; - } - - private static void testAllSlices(String id, NavigableSet<Integer> btree, NavigableSet<Integer> canon, boolean ascending, List<ListenableFuture<?>> results) - { - testOneSlice(id, btree, canon, results); - for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending)) - { - // test head/tail sets - testOneSlice(String.format("%s->[%d..)", id, lb), btree.headSet(lb, true), canon.headSet(lb, true), results); - testOneSlice(String.format("%s->(%d..)", id, lb), btree.headSet(lb, false), canon.headSet(lb, false), results); - testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, true), canon.tailSet(lb, true), results); - testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, false), canon.tailSet(lb, false), results); - for (Integer ub : range(canon.size(), lb, ascending)) - { - // test subsets - testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), btree.subSet(lb, true, ub, true), canon.subSet(lb, true, ub, true), results); - testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), btree.subSet(lb, false, ub, true), canon.subSet(lb, false, ub, true), results); - testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), btree.subSet(lb, true, ub, false), canon.subSet(lb, true, ub, false), results); - testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), btree.subSet(lb, false, ub, false), canon.subSet(lb, false, ub, false), results); - } - } - } - - private static void testOneSlice(final String id, final NavigableSet<Integer> test, final NavigableSet<Integer> canon, List<ListenableFuture<?>> results) - { - ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable() - { - - @Override - public void run() - { - test(id + " Count", test.size(), canon.size()); - testEqual(id, test.iterator(), canon.iterator()); - testEqual(id + "->DSCI", test.descendingIterator(), canon.descendingIterator()); - testEqual(id + "->DSCS", test.descendingSet().iterator(), canon.descendingSet().iterator()); - testEqual(id + "->DSCS->DSCI", test.descendingSet().descendingIterator(), canon.descendingSet().descendingIterator()); - } - }, null); - results.add(f); - COMPARE.execute(f); - } - - private static void test(String id, int test, int expect) - { - if (test != expect) - { - System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test)); - } - } - - private static <V> void testEqual(String id, Iterator<V> btree, Iterator<V> canon) - { - boolean equal = true; - while (btree.hasNext() && canon.hasNext()) - { - Object i = btree.next(); - Object j = canon.next(); - if (!i.equals(j)) - { - System.out.println(String.format("%s: Expected %d, Got %d", id, j, i)); - equal = false; - } - } - while (btree.hasNext()) - { - System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next())); - equal = false; - } - while (canon.hasNext()) - { - System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next())); - equal = false; - } - if (!equal) - throw new AssertionError("Not equal"); - } - - // should only be called on sets that range from 0->N or N->0 - private static final Iterable<Integer> range(final int size, final int from, final boolean ascending) - { - return new Iterable<Integer>() - { - int cur; - int delta; - int end; - { - if (ascending) - { - end = size + 1; - cur = from == Integer.MIN_VALUE ? -1 : from; - delta = 1; - } - else - { - end = -2; - cur = from == Integer.MIN_VALUE ? size : from; - delta = -1; - } - } - @Override - public Iterator<Integer> iterator() - { - return new Iterator<Integer>() - { - @Override - public boolean hasNext() - { - return cur != end; - } - - @Override - public Integer next() - { - Integer r = cur; - cur += delta; - return r; - } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - }; - } - }; - } - - private static Object[] randomTree(int maxSize, Random random) - { - TreeSet<Integer> build = new TreeSet<>(); - int size = random.nextInt(maxSize); - for (int i = 0 ; i < size ; i++) - { - build.add(random.nextInt()); - } - return BTree.build(build, ICMP, true, UpdateFunction.NoOp.<Integer>instance()); - } - - private static Iterable<Integer> randomSelection(Object[] iter, final Random rnd) - { - final float proportion = rnd.nextFloat(); - return Iterables.filter(new BTreeSet<>(iter, ICMP), new Predicate<Integer>() - { - public boolean apply(Integer integer) - { - return rnd.nextFloat() < proportion; - } - }); - } - - private static Iterable<Integer> randomMix(Object[] iter, final Random rnd) - { - final float proportion = rnd.nextFloat(); - return Iterables.transform(new BTreeSet<>(iter, ICMP), new Function<Integer, Integer>() - { - long last = Integer.MIN_VALUE; - - public Integer apply(Integer v) - { - long last = this.last; - this.last = v; - if (rnd.nextFloat() < proportion) - return v; - return (int)((v - last) / 2); - } - }); - } - - private static final class RandomAbort<V> implements UpdateFunction<V> - { - final Random rnd; - final float chance; - private RandomAbort(Random rnd, float chance) - { - this.rnd = rnd; - this.chance = chance; - } - - public V apply(V replacing, V update) - { - return update; - } - - public boolean abortEarly() - { - return rnd.nextFloat() < chance; - } - - public void allocated(long heapSize) - { - - } - - public V apply(V v) - { - return v; - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index 3d3de84..cf76e75 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -3,7 +3,7 @@ # Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file. # cluster_name: Test Cluster -memtable_allocation_type: offheap_objects +memtable_allocation_type: heap_buffers commitlog_sync: batch commitlog_sync_batch_window_in_ms: 1.0 commitlog_segment_size_in_mb: 5 http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/conf/logback-test.xml ---------------------------------------------------------------------- diff --git a/test/conf/logback-test.xml b/test/conf/logback-test.xml index 8cb2d6f..9facb83 100644 --- a/test/conf/logback-test.xml +++ b/test/conf/logback-test.xml @@ -68,7 +68,7 @@ <appender-ref ref="TEE"/> </appender> - <root level="DEBUG"> + <root level="INFO"> <appender-ref ref="ASYNC" /> </root> </configuration> http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-CompressionInfo.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-CompressionInfo.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-CompressionInfo.db deleted file mode 100644 index 44d2e59..0000000 Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-CompressionInfo.db and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Data.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Data.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Data.db deleted file mode 100644 index f75c4e6..0000000 Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Data.db and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Filter.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Filter.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Filter.db deleted file mode 100644 index 8f0a999..0000000 Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Filter.db and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Index.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Index.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Index.db deleted file mode 100644 index da84fbc..0000000 Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Index.db and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Statistics.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Statistics.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Statistics.db deleted file mode 100644 index 0762615..0000000 Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Statistics.db and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db deleted file mode 100644 index 376ca9d..0000000 Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-TOC.txt ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-TOC.txt b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-TOC.txt deleted file mode 100644 index cf6efa8..0000000 --- a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-TOC.txt +++ /dev/null @@ -1,7 +0,0 @@ -CompressionInfo.db -TOC.txt -Filter.db -Statistics.db -Data.db -Summary.db -Index.db http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-CRC.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/la-1-big-CRC.db b/test/data/corrupt-sstables/la-1-big-CRC.db new file mode 100644 index 0000000..f13b9c7 Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-CRC.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Data.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/la-1-big-Data.db b/test/data/corrupt-sstables/la-1-big-Data.db new file mode 100644 index 0000000..dc516d8 Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-Data.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Digest.adler32 ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/la-1-big-Digest.adler32 b/test/data/corrupt-sstables/la-1-big-Digest.adler32 new file mode 100644 index 0000000..e447277 --- /dev/null +++ b/test/data/corrupt-sstables/la-1-big-Digest.adler32 @@ -0,0 +1 @@ +2370519993 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Filter.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/la-1-big-Filter.db b/test/data/corrupt-sstables/la-1-big-Filter.db new file mode 100644 index 0000000..709d2ea Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-Filter.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Index.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/la-1-big-Index.db b/test/data/corrupt-sstables/la-1-big-Index.db new file mode 100644 index 0000000..178221e Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-Index.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Statistics.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/la-1-big-Statistics.db b/test/data/corrupt-sstables/la-1-big-Statistics.db new file mode 100644 index 0000000..23b76ac Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-Statistics.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Summary.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/la-1-big-Summary.db b/test/data/corrupt-sstables/la-1-big-Summary.db new file mode 100644 index 0000000..732f27c Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-Summary.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-TOC.txt ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/la-1-big-TOC.txt b/test/data/corrupt-sstables/la-1-big-TOC.txt new file mode 100644 index 0000000..9cbcd44 --- /dev/null +++ b/test/data/corrupt-sstables/la-1-big-TOC.txt @@ -0,0 +1,8 @@ +Statistics.db +Filter.db +Data.db +Summary.db +Digest.adler32 +CRC.db +TOC.txt +Index.db http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/cql3/DropKeyspaceCommitLogRecycleTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/cql3/DropKeyspaceCommitLogRecycleTest.java b/test/long/org/apache/cassandra/cql3/DropKeyspaceCommitLogRecycleTest.java deleted file mode 100644 index a0bacea..0000000 --- a/test/long/org/apache/cassandra/cql3/DropKeyspaceCommitLogRecycleTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.cql3; - -import org.junit.After; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.SchemaLoader; - -import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; - -/** - * Base class for CQL tests. - */ -public class DropKeyspaceCommitLogRecycleTest -{ - protected static final Logger logger = LoggerFactory.getLogger(DropKeyspaceCommitLogRecycleTest.class); - - private static final String KEYSPACE = "cql_test_keyspace"; - private static final String KEYSPACE2 = "cql_test_keyspace2"; - - static - { - // Once per-JVM is enough - SchemaLoader.prepareServer(); - } - - private void create(boolean both) - { - executeOnceInternal(String.format("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", KEYSPACE)); - executeOnceInternal(String.format("CREATE TABLE %s.test (k1 int, k2 int, v int, PRIMARY KEY (k1, k2))", KEYSPACE)); - - if (both) - { - executeOnceInternal(String.format("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", KEYSPACE2)); - executeOnceInternal(String.format("CREATE TABLE %s.test (k1 int, k2 int, v int, PRIMARY KEY (k1, k2))", KEYSPACE2)); - } - } - - private void insert() - { - executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (0, 0, 0)", KEYSPACE)); - executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (1, 1, 1)", KEYSPACE)); - executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (2, 2, 2)", KEYSPACE)); - - executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (0, 0, 0)", KEYSPACE2)); - executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (1, 1, 1)", KEYSPACE2)); - executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (2, 2, 2)", KEYSPACE2)); - } - - private void drop(boolean both) - { - executeOnceInternal(String.format("DROP KEYSPACE IF EXISTS %s", KEYSPACE)); - if (both) - executeOnceInternal(String.format("DROP KEYSPACE IF EXISTS %s", KEYSPACE2)); - } - - @Test - public void testRecycle() - { - for (int i = 0 ; i < 1000 ; i++) - { - create(i == 0); - insert(); - drop(false); - } - } - - @After - public void afterTest() throws Throwable - { - drop(true); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java b/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java deleted file mode 100644 index 24993c8..0000000 --- a/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java +++ /dev/null @@ -1,86 +0,0 @@ -package org.apache.cassandra.db; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.Util; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.db.marshal.UTF8Type; -import org.apache.cassandra.locator.SimpleStrategy; -import org.apache.cassandra.service.MigrationManager; -import org.apache.cassandra.utils.ByteBufferUtil; - -public class LongFlushMemtableTest -{ - public static final String KEYSPACE1 = "LongFlushMemtableTest"; - - @BeforeClass - public static void defineSchema() throws ConfigurationException - { - SchemaLoader.loadSchema(); - SchemaLoader.createKeyspace(KEYSPACE1, - SimpleStrategy.class, - KSMetaData.optsWithRF(1)); - } - - @Test - public void testFlushMemtables() throws ConfigurationException - { - Keyspace keyspace = Keyspace.open(KEYSPACE1); - for (int i = 0; i < 100; i++) - { - CFMetaData metadata = CFMetaData.denseCFMetaData(keyspace.getName(), "_CF" + i, UTF8Type.instance); - MigrationManager.announceNewColumnFamily(metadata); - } - - for (int j = 0; j < 200; j++) - { - for (int i = 0; i < 100; i++) - { - Mutation rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("key" + j)); - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "_CF" + i); - // don't cheat by allocating this outside of the loop; that defeats the purpose of deliberately using lots of memory - ByteBuffer value = ByteBuffer.allocate(100000); - cf.addColumn(new BufferCell(Util.cellname("c"), value)); - rm.add(cf); - rm.applyUnsafe(); - } - } - - int flushes = 0; - for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) - { - if (cfs.name.startsWith("_CF")) - flushes += cfs.metric.memtableSwitchCount.getCount(); - } - assert flushes > 0; - } -} - http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/LongKeyspaceTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/LongKeyspaceTest.java b/test/long/org/apache/cassandra/db/LongKeyspaceTest.java deleted file mode 100644 index fe22da8..0000000 --- a/test/long/org/apache/cassandra/db/LongKeyspaceTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.db; - -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.locator.SimpleStrategy; -import org.apache.cassandra.utils.WrappedRunnable; -import static org.apache.cassandra.Util.column; - -import org.apache.cassandra.Util; - -public class LongKeyspaceTest -{ - public static final String KEYSPACE1 = "LongKeyspaceTest"; - public static final String CF_STANDARD = "Standard1"; - - @BeforeClass - public static void defineSchema() throws ConfigurationException - { - SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE1, - SimpleStrategy.class, - KSMetaData.optsWithRF(1), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD)); - } - - @Test - public void testGetRowMultiColumn() throws Throwable - { - final Keyspace keyspace = Keyspace.open(KEYSPACE1); - final ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore("Standard1"); - - for (int i = 1; i < 5000; i += 100) - { - Mutation rm = new Mutation(KEYSPACE1, Util.dk("key" + i).getKey()); - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1"); - for (int j = 0; j < i; j++) - cf.addColumn(column("c" + j, "v" + j, 1L)); - rm.add(cf); - rm.applyUnsafe(); - } - - Runnable verify = new WrappedRunnable() - { - public void runMayThrow() throws Exception - { - ColumnFamily cf; - for (int i = 1; i < 5000; i += 100) - { - for (int j = 0; j < i; j++) - { - cf = cfStore.getColumnFamily(Util.namesQueryFilter(cfStore, Util.dk("key" + i), "c" + j)); - KeyspaceTest.assertColumns(cf, "c" + j); - } - } - - } - }; - KeyspaceTest.reTest(keyspace.getColumnFamilyStore("Standard1"), verify); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java b/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java deleted file mode 100644 index b4efd49..0000000 --- a/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java +++ /dev/null @@ -1,97 +0,0 @@ -package org.apache.cassandra.db.commitlog; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.nio.ByteBuffer; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.cassandra.Util; -import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; -import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.UUIDGen; - -public class ComitLogStress -{ - - public static final String format = "%s,%s,%s,%s,%s,%s"; - - public static void main(String[] args) throws Exception { - int NUM_THREADS = Runtime.getRuntime().availableProcessors(); - if (args.length >= 1) { - NUM_THREADS = Integer.parseInt(args[0]); - System.out.println("Setting num threads to: " + NUM_THREADS); - } - ExecutorService executor = new JMXEnabledThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60, - TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10 * NUM_THREADS), new NamedThreadFactory("Stress"), ""); - ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1); - - org.apache.cassandra.SchemaLoader.loadSchema(); - org.apache.cassandra.SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour - final AtomicLong count = new AtomicLong(); - final long start = System.currentTimeMillis(); - System.out.println(String.format(format, "seconds", "max_mb", "allocated_mb", "free_mb", "diffrence", "count")); - scheduled.scheduleAtFixedRate(new Runnable() { - long lastUpdate = 0; - - public void run() { - Runtime runtime = Runtime.getRuntime(); - long maxMemory = mb(runtime.maxMemory()); - long allocatedMemory = mb(runtime.totalMemory()); - long freeMemory = mb(runtime.freeMemory()); - long temp = count.get(); - System.out.println(String.format(format, ((System.currentTimeMillis() - start) / 1000), - maxMemory, allocatedMemory, freeMemory, (temp - lastUpdate), lastUpdate)); - lastUpdate = temp; - } - }, 1, 1, TimeUnit.SECONDS); - - while (true) { - executor.execute(new CommitlogExecutor()); - count.incrementAndGet(); - } - } - - private static long mb(long maxMemory) { - return maxMemory / (1024 * 1024); - } - - static final String keyString = UUIDGen.getTimeUUID().toString(); - public static class CommitlogExecutor implements Runnable { - public void run() { - String ks = "Keyspace1"; - ByteBuffer key = ByteBufferUtil.bytes(keyString); - for (int i=0; i<100; ++i) { - Mutation mutation = new Mutation(ks, key); - mutation.add("Standard1", Util.cellname("name"), ByteBufferUtil.bytes("value" + i), - System.currentTimeMillis()); - CommitLog.instance.add(mutation); - } - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index 5897dec..9b4dde7 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; @@ -48,15 +49,20 @@ import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.UpdateBuilder; import org.apache.cassandra.config.Config.CommitLogSync; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.ColumnSerializer; import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.io.util.FastByteArrayInputStream; +import org.apache.cassandra.utils.FBUtilities; public class CommitLogStressTest { @@ -354,8 +360,7 @@ public class CommitLogStressTest } double time = (System.currentTimeMillis() - start) / 1000.0; double avg = (temp / time); - System.out - .println( + System.out.println( String.format("second %d mem max %.0fmb allocated %.0fmb free %.0fmb mutations %d since start %d avg %.3f content %.1fmb ondisk %.1fmb transfer %.3fmb", ((System.currentTimeMillis() - start) / 1000), mb(maxMemory), @@ -421,20 +426,20 @@ public class CommitLogStressTest { if (rl != null) rl.acquire(); - String ks = "Keyspace1"; ByteBuffer key = randomBytes(16, rand); - Mutation mutation = new Mutation(ks, key); + UpdateBuilder builder = UpdateBuilder.create(Schema.instance.getCFMetaData("Keyspace1", "Standard1"), Util.dk(key)); for (int ii = 0; ii < numCells; ii++) { int sz = randomSize ? rand.nextInt(cellSize) : cellSize; ByteBuffer bytes = randomBytes(sz, rand); - mutation.add("Standard1", Util.cellname("name" + ii), bytes, System.currentTimeMillis()); + builder.newRow("name" + ii).add("val", bytes); hash = hash(hash, bytes); ++cells; dataSize += sz; } - rp = commitLog.add(mutation); + + rp = commitLog.add(new Mutation(builder.build())); counter.incrementAndGet(); } } @@ -468,7 +473,7 @@ public class CommitLogStressTest { mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), desc.getMessagingVersion(), - ColumnSerializer.Flag.LOCAL); + SerializationHelper.Flag.LOCAL); } catch (IOException e) { @@ -476,18 +481,24 @@ public class CommitLogStressTest throw new AssertionError(e); } - for (ColumnFamily cf : mutation.getColumnFamilies()) + for (PartitionUpdate cf : mutation.getPartitionUpdates()) { - for (Cell c : cf.getSortedColumns()) + + Iterator<Row> rowIterator = cf.iterator(); + + while (rowIterator.hasNext()) { - if (new String(c.name().toByteBuffer().array(), StandardCharsets.UTF_8).startsWith("name")) + Row row = rowIterator.next(); + if (!(UTF8Type.instance.compose(row.clustering().get(0)).startsWith("name"))) + continue; + + for (Cell cell : row) { - hash = hash(hash, c.value()); + hash = hash(hash, cell.value()); ++cells; } } } } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java index e6c8f56..ca223ca 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java @@ -26,12 +26,14 @@ import org.junit.BeforeClass; import org.junit.Before; import org.junit.Test; +import org.apache.cassandra.UpdateBuilder; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.Util; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.SSTableUtils; @@ -93,7 +95,7 @@ public class LongCompactionsTest testCompaction(100, 800, 5); } - protected void testCompaction(int sstableCount, int rowsPerSSTable, int colsPerRow) throws Exception + protected void testCompaction(int sstableCount, int partitionsPerSSTable, int rowsPerPartition) throws Exception { CompactionManager.instance.disableAutoCompaction(); @@ -103,17 +105,16 @@ public class LongCompactionsTest ArrayList<SSTableReader> sstables = new ArrayList<>(); for (int k = 0; k < sstableCount; k++) { - SortedMap<String,ColumnFamily> rows = new TreeMap<String,ColumnFamily>(); - for (int j = 0; j < rowsPerSSTable; j++) + SortedMap<String, PartitionUpdate> rows = new TreeMap<>(); + for (int j = 0; j < partitionsPerSSTable; j++) { String key = String.valueOf(j); - Cell[] cols = new Cell[colsPerRow]; - for (int i = 0; i < colsPerRow; i++) - { - // last sstable has highest timestamps - cols[i] = Util.column(String.valueOf(i), String.valueOf(i), k); - } - rows.put(key, SSTableUtils.createCF(KEYSPACE1, CF_STANDARD, Long.MIN_VALUE, Integer.MIN_VALUE, cols)); + // last sstable has highest timestamps + UpdateBuilder builder = UpdateBuilder.create(store.metadata, String.valueOf(j)) + .withTimestamp(k); + for (int i = 0; i < rowsPerPartition; i++) + builder.newRow(String.valueOf(i)).add("val", String.valueOf(i)); + rows.put(key, builder.build()); } SSTableReader sstable = SSTableUtils.prepare().write(rows); sstables.add(sstable); @@ -133,8 +134,8 @@ public class LongCompactionsTest System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms", this.getClass().getName(), sstableCount, - rowsPerSSTable, - colsPerRow, + partitionsPerSSTable, + rowsPerPartition, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start))); } @@ -157,23 +158,23 @@ public class LongCompactionsTest for (int j = 0; j < SSTABLES; j++) { for (int i = 0; i < ROWS_PER_SSTABLE; i++) { DecoratedKey key = Util.dk(String.valueOf(i % 2)); - Mutation rm = new Mutation(KEYSPACE1, key.getKey()); long timestamp = j * ROWS_PER_SSTABLE + i; - rm.add("Standard1", Util.cellname(String.valueOf(i / 2)), - ByteBufferUtil.EMPTY_BYTE_BUFFER, - timestamp); maxTimestampExpected = Math.max(timestamp, maxTimestampExpected); - rm.apply(); + UpdateBuilder.create(cfs.metadata, key) + .withTimestamp(timestamp) + .newRow(String.valueOf(i / 2)).add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER) + .apply(); + inserted.add(key); } cfs.forceBlockingFlush(); CompactionsTest.assertMaxTimestamp(cfs, maxTimestampExpected); - assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(cfs).size()); + + assertEquals(inserted.toString(), inserted.size(), Util.getAll(Util.cmd(cfs).build()).size()); } forceCompactions(cfs); - - assertEquals(inserted.size(), Util.getRangeSlice(cfs).size()); + assertEquals(inserted.toString(), inserted.size(), Util.getAll(Util.cmd(cfs).build()).size()); // make sure max timestamp of compacted sstables is recorded properly after compaction. CompactionsTest.assertMaxTimestamp(cfs, maxTimestampExpected); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java index cc8203e..fbee72a 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java @@ -27,6 +27,7 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.UpdateBuilder; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.exceptions.ConfigurationException; @@ -75,11 +76,11 @@ public class LongLeveledCompactionStrategyTest for (int r = 0; r < rows; r++) { DecoratedKey key = Util.dk(String.valueOf(r)); - Mutation rm = new Mutation(ksname, key.getKey()); + UpdateBuilder builder = UpdateBuilder.create(store.metadata, key); for (int c = 0; c < columns; c++) - { - rm.add(cfname, Util.cellname("column" + c), value, 0); - } + builder.newRow("column" + c).add("val", value); + + Mutation rm = new Mutation(builder.build()); rm.apply(); store.forceBlockingFlush(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/unit/org/apache/cassandra/AbstractReadCommandBuilder.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/AbstractReadCommandBuilder.java b/test/unit/org/apache/cassandra/AbstractReadCommandBuilder.java new file mode 100644 index 0000000..575036f --- /dev/null +++ b/test/unit/org/apache/cassandra/AbstractReadCommandBuilder.java @@ -0,0 +1,327 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.cassandra; + +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.dht.*; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +public abstract class AbstractReadCommandBuilder +{ + protected final ColumnFamilyStore cfs; + protected int nowInSeconds; + + private int cqlLimit = -1; + private int pagingLimit = -1; + private boolean reversed = false; + + private Set<ColumnIdentifier> columns; + protected final RowFilter filter = RowFilter.create(); + + private Slice.Bound lowerClusteringBound; + private Slice.Bound upperClusteringBound; + + private NavigableSet<Clustering> clusterings; + + // Use Util.cmd() instead of this ctor directly + AbstractReadCommandBuilder(ColumnFamilyStore cfs) + { + this.cfs = cfs; + this.nowInSeconds = FBUtilities.nowInSeconds(); + } + + public AbstractReadCommandBuilder withNowInSeconds(int nowInSec) + { + this.nowInSeconds = nowInSec; + return this; + } + + public AbstractReadCommandBuilder fromIncl(Object... values) + { + assert lowerClusteringBound == null && clusterings == null; + this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, true, values); + return this; + } + + public AbstractReadCommandBuilder fromExcl(Object... values) + { + assert lowerClusteringBound == null && clusterings == null; + this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, false, values); + return this; + } + + public AbstractReadCommandBuilder toIncl(Object... values) + { + assert upperClusteringBound == null && clusterings == null; + this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, true, values); + return this; + } + + public AbstractReadCommandBuilder toExcl(Object... values) + { + assert upperClusteringBound == null && clusterings == null; + this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, false, values); + return this; + } + + public AbstractReadCommandBuilder includeRow(Object... values) + { + assert lowerClusteringBound == null && upperClusteringBound == null; + + if (this.clusterings == null) + this.clusterings = new TreeSet<>(cfs.metadata.comparator); + + this.clusterings.add(cfs.metadata.comparator.make(values)); + return this; + } + + public AbstractReadCommandBuilder reverse() + { + this.reversed = true; + return this; + } + + public AbstractReadCommandBuilder withLimit(int newLimit) + { + this.cqlLimit = newLimit; + return this; + } + + public AbstractReadCommandBuilder withPagingLimit(int newLimit) + { + this.pagingLimit = newLimit; + return this; + } + + public AbstractReadCommandBuilder columns(String... columns) + { + if (this.columns == null) + this.columns = new HashSet<>(); + + for (String column : columns) + this.columns.add(ColumnIdentifier.getInterned(column, true)); + return this; + } + + private ByteBuffer bb(Object value, AbstractType<?> type) + { + return value instanceof ByteBuffer ? (ByteBuffer)value : ((AbstractType)type).decompose(value); + } + + private AbstractType<?> forValues(AbstractType<?> collectionType) + { + assert collectionType instanceof CollectionType; + CollectionType ct = (CollectionType)collectionType; + switch (ct.kind) + { + case LIST: + case MAP: + return ct.valueComparator(); + case SET: + return ct.nameComparator(); + } + throw new AssertionError(); + } + + private AbstractType<?> forKeys(AbstractType<?> collectionType) + { + assert collectionType instanceof CollectionType; + CollectionType ct = (CollectionType)collectionType; + switch (ct.kind) + { + case LIST: + case MAP: + return ct.nameComparator(); + } + throw new AssertionError(); + } + + public AbstractReadCommandBuilder filterOn(String column, Operator op, Object value) + { + ColumnDefinition def = cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned(column, true)); + assert def != null; + + AbstractType<?> type = def.type; + if (op == Operator.CONTAINS) + type = forValues(type); + else if (op == Operator.CONTAINS_KEY) + type = forKeys(type); + + this.filter.add(def, op, bb(value, type)); + return this; + } + + protected ColumnFilter makeColumnFilter() + { + if (columns == null) + return ColumnFilter.all(cfs.metadata); + + ColumnFilter.Builder builder = ColumnFilter.allColumnsBuilder(cfs.metadata); + for (ColumnIdentifier column : columns) + builder.add(cfs.metadata.getColumnDefinition(column)); + return builder.build(); + } + + protected ClusteringIndexFilter makeFilter() + { + if (clusterings != null) + { + return new ClusteringIndexNamesFilter(clusterings, reversed); + } + else + { + Slice slice = Slice.make(lowerClusteringBound == null ? Slice.Bound.BOTTOM : lowerClusteringBound, + upperClusteringBound == null ? Slice.Bound.TOP : upperClusteringBound); + return new ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), reversed); + } + } + + protected DataLimits makeLimits() + { + DataLimits limits = cqlLimit < 0 ? DataLimits.NONE : DataLimits.cqlLimits(cqlLimit); + if (pagingLimit >= 0) + limits = limits.forPaging(pagingLimit); + return limits; + } + + public Row getOnlyRow() + { + return Util.getOnlyRow(build()); + } + + public Row getOnlyRowUnfiltered() + { + return Util.getOnlyRowUnfiltered(build()); + } + + public FilteredPartition getOnlyPartition() + { + return Util.getOnlyPartition(build()); + } + + public Partition getOnlyPartitionUnfiltered() + { + return Util.getOnlyPartitionUnfiltered(build()); + } + + public abstract ReadCommand build(); + + public static class SinglePartitionBuilder extends AbstractReadCommandBuilder + { + private final DecoratedKey partitionKey; + + SinglePartitionBuilder(ColumnFamilyStore cfs, DecoratedKey key) + { + super(cfs); + this.partitionKey = key; + } + + @Override + public ReadCommand build() + { + return SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter()); + } + } + + public static class PartitionRangeBuilder extends AbstractReadCommandBuilder + { + private DecoratedKey startKey; + private boolean startInclusive; + private DecoratedKey endKey; + private boolean endInclusive; + + PartitionRangeBuilder(ColumnFamilyStore cfs) + { + super(cfs); + } + + public PartitionRangeBuilder fromKeyIncl(Object... values) + { + assert startKey == null; + this.startInclusive = true; + this.startKey = Util.makeKey(cfs.metadata, values); + return this; + } + + public PartitionRangeBuilder fromKeyExcl(Object... values) + { + assert startKey == null; + this.startInclusive = false; + this.startKey = Util.makeKey(cfs.metadata, values); + return this; + } + + public PartitionRangeBuilder toKeyIncl(Object... values) + { + assert endKey == null; + this.endInclusive = true; + this.endKey = Util.makeKey(cfs.metadata, values); + return this; + } + + public PartitionRangeBuilder toKeyExcl(Object... values) + { + assert endKey == null; + this.endInclusive = false; + this.endKey = Util.makeKey(cfs.metadata, values); + return this; + } + + @Override + public ReadCommand build() + { + PartitionPosition start = startKey; + if (start == null) + { + start = StorageService.getPartitioner().getMinimumToken().maxKeyBound(); + startInclusive = false; + } + PartitionPosition end = endKey; + if (end == null) + { + end = StorageService.getPartitioner().getMinimumToken().maxKeyBound(); + endInclusive = true; + } + + AbstractBounds<PartitionPosition> bounds; + if (startInclusive && endInclusive) + bounds = new Bounds<PartitionPosition>(start, end); + else if (startInclusive && !endInclusive) + bounds = new IncludingExcludingBounds<PartitionPosition>(start, end); + else if (!startInclusive && endInclusive) + bounds = new Range<PartitionPosition>(start, end); + else + bounds = new ExcludingBounds<PartitionPosition>(start, end); + + return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter())); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/unit/org/apache/cassandra/EmbeddedServer.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/EmbeddedServer.java b/test/unit/org/apache/cassandra/EmbeddedServer.java deleted file mode 100644 index 25754ea..0000000 --- a/test/unit/org/apache/cassandra/EmbeddedServer.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cassandra; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.cassandra.service.CassandraDaemon; -import org.junit.AfterClass; -import org.junit.BeforeClass; - -public class EmbeddedServer extends SchemaLoader -{ - protected static CassandraDaemon daemon = null; - - enum GatewayService - { - Thrift - } - - public static GatewayService getDaemonGatewayService() - { - return GatewayService.Thrift; - } - - static ExecutorService executor = Executors.newSingleThreadExecutor(); - - @BeforeClass - public static void startCassandra() - - { - executor.execute(new Runnable() - { - public void run() - { - switch (getDaemonGatewayService()) - { - case Thrift: - default: - daemon = new org.apache.cassandra.service.CassandraDaemon(); - } - daemon.activate(); - } - }); - try - { - TimeUnit.SECONDS.sleep(3); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - } - - @AfterClass - public static void stopCassandra() throws Exception - { - if (daemon != null) - { - daemon.deactivate(); - } - executor.shutdown(); - executor.shutdownNow(); - } - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/unit/org/apache/cassandra/MockSchema.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java index c71c98b..f0a849b 100644 --- a/test/unit/org/apache/cassandra/MockSchema.java +++ b/test/unit/org/apache/cassandra/MockSchema.java @@ -21,7 +21,7 @@ package org.apache.cassandra; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.ImmutableMap; @@ -32,7 +32,6 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.SimpleSparseCellNameType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.io.sstable.Component; @@ -119,12 +118,13 @@ public class MockSchema throw new RuntimeException(e); } } + SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.EMPTY_LIST); StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator) - .finalizeMetadata(Murmur3Partitioner.instance.getClass().getCanonicalName(), 0.01f, -1) + .finalizeMetadata(Murmur3Partitioner.instance.getClass().getCanonicalName(), 0.01f, -1, header) .get(MetadataType.STATS); SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata, Murmur3Partitioner.instance, segmentedFile.sharedCopy(), segmentedFile.sharedCopy(), indexSummary.sharedCopy(), - new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL); + new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header); reader.first = reader.last = readerBounds(generation); if (!keepRef) reader.selfRef().release(); @@ -140,10 +140,11 @@ public class MockSchema private static CFMetaData newCFMetaData(String ksname, String cfname) { - CFMetaData metadata = new CFMetaData(ksname, - cfname, - ColumnFamilyType.Standard, - new SimpleSparseCellNameType(UTF8Type.instance)); + CFMetaData metadata = CFMetaData.Builder.create(ksname, cfname) + .addPartitionKey("key", UTF8Type.instance) + .addClusteringColumn("col", UTF8Type.instance) + .addRegularColumn("value", UTF8Type.instance) + .build(); metadata.caching(CachingOptions.NONE); return metadata; }
