Updated Branches: refs/heads/trunk fe4247e58 -> 812504713
Replace UnsortedColumns usage with ArrayBackedSortedColumns patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for CASSANDRA-6630 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/81250471 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/81250471 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/81250471 Branch: refs/heads/trunk Commit: 812504713523c2b8fbff394fbf4448ea30b5e4a3 Parents: fe4247e Author: Aleksey Yeschenko <[email protected]> Authored: Thu Feb 6 04:57:56 2014 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Thu Feb 6 04:57:56 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cql3/statements/UpdateStatement.java | 2 +- .../cassandra/db/ArrayBackedSortedColumns.java | 14 ------ .../apache/cassandra/db/AtomicBTreeColumns.java | 9 +--- .../org/apache/cassandra/db/ColumnFamily.java | 6 ++- src/java/org/apache/cassandra/db/Mutation.java | 2 +- .../apache/cassandra/service/paxos/Commit.java | 5 ++- .../service/paxos/PrepareResponse.java | 10 +++-- .../cassandra/thrift/CassandraServer.java | 4 +- .../apache/cassandra/db/CounterCacheTest.java | 2 +- .../cassandra/db/CounterMutationTest.java | 46 +++++++++++++++----- 11 files changed, 56 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7d628b5..a139fdc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -25,6 +25,7 @@ * CF id is changed to be non-deterministic. Data dir/key cache are created uniquely for CF id (CASSANDRA-5202) * New counters implementation (CASSANDRA-6504) + * Replace UnsortedColumns usage with ArrayBackedSortedColumns (CASSANDRA-6630) 2.0.6 http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index 1102c09..6ed0e33 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -99,7 +99,7 @@ public class UpdateStatement extends ModificationStatement public ColumnFamily updateForKey(ByteBuffer key, Composite prefix, UpdateParameters params) throws InvalidRequestException { - ColumnFamily cf = UnsortedColumns.factory.create(cfm); + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm); addUpdateForKey(cf, key, prefix, params); return cf; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java index 7bcbe25..b81e403 100644 --- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java +++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java @@ -91,16 +91,6 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns return pos >= 0 ? cells.get(pos) : null; } - /** - * AddColumn throws an exception if the cell added does not sort after - * the last cell in the map. - * The reasoning is that this implementation can get slower if too much - * insertions are done in unsorted order and right now we only use it when - * *all* insertion (with this method) are done in sorted order. The - * assertion throwing is thus a protection against performance regression - * without knowing about (we can revisit that decision later if we have - * use cases where most insert are in sorted order but a few are not). - */ public void addColumn(Cell cell, AbstractAllocator allocator) { if (cells.isEmpty()) @@ -109,11 +99,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns return; } - // Fast path if inserting at the tail int c = internalComparator().compare(cells.get(getColumnCount() - 1).name(), cell.name()); - // note that we want an assertion here (see addColumn javadoc), but we also want that if - // assertion are disabled, addColumn works correctly with unsorted input - assert c <= 0 : "Added cell does not sort as the " + (reversed ? "first" : "last") + " cell"; if (c < 0) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java index fd7d4bc..c1c7b66 100644 --- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java @@ -234,14 +234,7 @@ public class AtomicBTreeColumns extends ColumnFamily public Delta addAllWithSizeDelta(final ColumnFamily cm, AbstractAllocator allocator, Function<Cell, Cell> transformation, Updater indexer, Delta delta) { boolean transformed = false; - Collection<Cell> insert; - if (cm instanceof UnsortedColumns) - { - insert = transform(metadata.comparator.columnComparator(), cm, transformation, true); - transformed = true; - } - else - insert = cm.getSortedColumns(); + Collection<Cell> insert = cm.getSortedColumns(); while (true) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java index 9d2856d..2df3fbf 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@ -491,7 +491,6 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry return builder.build(); } - // Note: the returned ColumnFamily will be an UnsortedColumns. public static ColumnFamily fromBytes(ByteBuffer bytes) { if (bytes == null) @@ -499,7 +498,10 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry try { - return serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes)), UnsortedColumns.factory, ColumnSerializer.Flag.LOCAL, MessagingService.current_version); + return serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes)), + ArrayBackedSortedColumns.factory, + ColumnSerializer.Flag.LOCAL, + MessagingService.current_version); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index 31d9503..ef9b02d 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -310,7 +310,7 @@ public class Mutation implements IMutation private ColumnFamily deserializeOneCf(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException { - ColumnFamily cf = ColumnFamily.serializer.deserialize(in, UnsortedColumns.factory, flag, version); + ColumnFamily cf = ColumnFamily.serializer.deserialize(in, ArrayBackedSortedColumns.factory, flag, version); // We don't allow Mutation with null column family, so we should never get null back. assert cf != null; return cf; http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/service/paxos/Commit.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java index 23f35db..aae9b72 100644 --- a/src/java/org/apache/cassandra/service/paxos/Commit.java +++ b/src/java/org/apache/cassandra/service/paxos/Commit.java @@ -139,7 +139,10 @@ public class Commit { return new Commit(ByteBufferUtil.readWithShortLength(in), UUIDSerializer.serializer.deserialize(in, version), - ColumnFamily.serializer.deserialize(in, UnsortedColumns.factory, ColumnSerializer.Flag.LOCAL, version)); + ColumnFamily.serializer.deserialize(in, + ArrayBackedSortedColumns.factory, + ColumnSerializer.Flag.LOCAL, + version)); } public long serializedSize(Commit commit, int version) http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java index d2bd835..14e0bc7 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java @@ -26,9 +26,9 @@ import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.cassandra.db.ArrayBackedSortedColumns; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ColumnSerializer; -import org.apache.cassandra.db.UnsortedColumns; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.UUIDSerializer; @@ -82,10 +82,14 @@ public class PrepareResponse return new PrepareResponse(success, new Commit(key, UUIDSerializer.serializer.deserialize(in, version), - ColumnFamily.serializer.deserialize(in, UnsortedColumns.factory, ColumnSerializer.Flag.LOCAL, version)), + ColumnFamily.serializer.deserialize(in, + ArrayBackedSortedColumns.factory, + ColumnSerializer.Flag.LOCAL, version)), new Commit(key, UUIDSerializer.serializer.deserialize(in, version), - ColumnFamily.serializer.deserialize(in, UnsortedColumns.factory, ColumnSerializer.Flag.LOCAL, version))); + ColumnFamily.serializer.deserialize(in, + ArrayBackedSortedColumns.factory, + ColumnSerializer.Flag.LOCAL, version))); } public long serializedSize(PrepareResponse response, int version) http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index fe9dc3f..44fb22e 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -754,7 +754,7 @@ public class CassandraServer implements Cassandra.Iface ThriftValidation.validateColumnData(metadata, null, column); CFMetaData cfm = Schema.instance.getCFMetaData(cState.getKeyspace(), column_family); - UnsortedColumns cfUpdates = UnsortedColumns.factory.create(cfm); + ColumnFamily cfUpdates = ArrayBackedSortedColumns.factory.create(cfm); for (Column column : updates) cfUpdates.addColumn(cfm.comparator.cellFromByteBuffer(column.name), column.value, column.timestamp); @@ -765,7 +765,7 @@ public class CassandraServer implements Cassandra.Iface } else { - cfExpected = TreeMapBackedSortedColumns.factory.create(cfm); + cfExpected = ArrayBackedSortedColumns.factory.create(cfm); for (Column column : expected) cfExpected.addColumn(cfm.comparator.cellFromByteBuffer(column.name), column.value, column.timestamp); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/test/unit/org/apache/cassandra/db/CounterCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CounterCacheTest.java b/test/unit/org/apache/cassandra/db/CounterCacheTest.java index 78e7c80..a015a43 100644 --- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java +++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java @@ -74,7 +74,7 @@ public class CounterCacheTest extends SchemaLoader ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF); CacheService.instance.invalidateCounterCache(); - ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata); + ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); cells.addColumn(new CounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros())); cells.addColumn(new CounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros())); new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/test/unit/org/apache/cassandra/db/CounterMutationTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CounterMutationTest.java b/test/unit/org/apache/cassandra/db/CounterMutationTest.java index 3676ef9..431531c 100644 --- a/test/unit/org/apache/cassandra/db/CounterMutationTest.java +++ b/test/unit/org/apache/cassandra/db/CounterMutationTest.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.db; +import java.nio.ByteBuffer; + import org.junit.Test; import org.apache.cassandra.SchemaLoader; @@ -45,21 +47,21 @@ public class CounterMutationTest extends SchemaLoader cfs.truncateBlocking(); // Do the initial update (+1) - ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata); + ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); cells.addCounter(cellname(1), 1L); new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); ColumnFamily current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis())); assertEquals(1L, CounterContext.instance().total(current.getColumn(cellname(1)).value())); // Make another increment (+2) - cells = UnsortedColumns.factory.create(cfs.metadata); + cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); cells.addCounter(cellname(1), 2L); new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis())); assertEquals(3L, CounterContext.instance().total(current.getColumn(cellname(1)).value())); // Decrement to 0 (-3) - cells = UnsortedColumns.factory.create(cfs.metadata); + cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); cells.addCounter(cellname(1), -3L); new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis())); @@ -74,7 +76,7 @@ public class CounterMutationTest extends SchemaLoader cfs.truncateBlocking(); // Do the initial update (+1, -1) - ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata); + ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); cells.addCounter(cellname(1), 1L); cells.addCounter(cellname(2), -1L); new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); @@ -83,7 +85,7 @@ public class CounterMutationTest extends SchemaLoader assertEquals(-1L, CounterContext.instance().total(current.getColumn(cellname(2)).value())); // Make another increment (+2, -2) - cells = UnsortedColumns.factory.create(cfs.metadata); + cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); cells.addCounter(cellname(1), 2L); cells.addCounter(cellname(2), -2L); new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); @@ -91,7 +93,7 @@ public class CounterMutationTest extends SchemaLoader assertEquals(3L, CounterContext.instance().total(current.getColumn(cellname(1)).value())); // Decrement to 0 (-3, +3) - cells = UnsortedColumns.factory.create(cfs.metadata); + cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); cells.addCounter(cellname(1), -3L); cells.addCounter(cellname(2), 3L); new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); @@ -114,11 +116,11 @@ public class CounterMutationTest extends SchemaLoader cfs2.truncateBlocking(); // Do the update (+1, -1), (+2, -2) - ColumnFamily cells1 = UnsortedColumns.factory.create(cfs1.metadata); + ColumnFamily cells1 = ArrayBackedSortedColumns.factory.create(cfs1.metadata); cells1.addCounter(cellname(1), 1L); cells1.addCounter(cellname(2), -1L); - ColumnFamily cells2 = UnsortedColumns.factory.create(cfs2.metadata); + ColumnFamily cells2 = ArrayBackedSortedColumns.factory.create(cfs2.metadata); cells2.addCounter(cellname(1), 2L); cells2.addCounter(cellname(2), -2L); @@ -151,7 +153,7 @@ public class CounterMutationTest extends SchemaLoader cfs.truncateBlocking(); // Do the initial update (+1, -1) - ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata); + ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); cells.addCounter(cellname(1), 1L); cells.addCounter(cellname(2), 1L); new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); @@ -160,7 +162,7 @@ public class CounterMutationTest extends SchemaLoader assertEquals(1L, CounterContext.instance().total(current.getColumn(cellname(2)).value())); // Remove the first counter, increment the second counter - cells = UnsortedColumns.factory.create(cfs.metadata); + cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); cells.addTombstone(cellname(1), (int) System.currentTimeMillis() / 1000, FBUtilities.timestampMicros()); cells.addCounter(cellname(2), 1L); new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); @@ -169,7 +171,7 @@ public class CounterMutationTest extends SchemaLoader assertEquals(2L, CounterContext.instance().total(current.getColumn(cellname(2)).value())); // Increment the first counter, make sure it's still shadowed by the tombstone - cells = UnsortedColumns.factory.create(cfs.metadata); + cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); cells.addCounter(cellname(1), 1L); new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis())); @@ -184,7 +186,7 @@ public class CounterMutationTest extends SchemaLoader assertNull(current.getColumn(cellname(2))); // Increment both counters, ensure that both stay dead - cells = UnsortedColumns.factory.create(cfs.metadata); + cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); cells.addCounter(cellname(1), 1L); cells.addCounter(cellname(2), 1L); new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); @@ -192,4 +194,24 @@ public class CounterMutationTest extends SchemaLoader assertNull(current.getColumn(cellname(1))); assertNull(current.getColumn(cellname(2))); } + + @Test + public void testDuplicateCells() throws WriteTimeoutException + { + ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF1); + cfs.truncateBlocking(); + + ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); + cells.addCounter(cellname(1), 1L); + cells.addCounter(cellname(1), 2L); + cells.addCounter(cellname(1), 3L); + cells.addCounter(cellname(1), 4L); + new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); + + ColumnFamily current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis())); + ByteBuffer context = current.getColumn(cellname(1)).value(); + assertEquals(10L, CounterContext.instance().total(context)); + assertEquals(ClockAndCount.create(1L, 10L), CounterContext.instance().getLocalClockAndCount(context)); + assertEquals(ClockAndCount.create(1L, 10L), cfs.getCachedCounter(bytes(1), cellname(1))); + } }
