This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new 208af4c658 Introduce RangeDeps 208af4c658 is described below commit 208af4c658df3779fbca5d7826125d37424deb37 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Mon Jan 9 13:19:16 2023 +0000 Introduce RangeDeps Refactor Deps into KeyDeps and RangeDeps patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-18173 --- .build/include-accord.sh | 2 +- .../service/accord/AccordObjectSizes.java | 35 +++-- .../service/accord/serializers/DepsSerializer.java | 163 +++++++++++++++------ .../service/accord/AccordCommandStoreTest.java | 11 +- .../service/accord/AccordCommandTest.java | 9 +- 5 files changed, 156 insertions(+), 64 deletions(-) diff --git a/.build/include-accord.sh b/.build/include-accord.sh index eabe4d204b..e7cedab46e 100755 --- a/.build/include-accord.sh +++ b/.build/include-accord.sh @@ -25,7 +25,7 @@ set -o nounset bin="$(cd "$(dirname "$0")" > /dev/null; pwd)" accord_repo='https://github.com/apache/cassandra-accord.git' -accord_branch='5626c7c11400d4cf6d01a8e22517b53a83f5c512' +accord_branch='686326eedc8d2553c98a30abc81a925be3942b8c' accord_src="$bin/cassandra-accord" checkout() { diff --git a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java index 0246ee6780..ca18f67c75 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java +++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java @@ -18,8 +18,6 @@ package org.apache.cassandra.service.accord; -import java.util.Map; - import accord.api.Key; import accord.api.RoutingKey; import accord.local.Node; @@ -28,16 +26,17 @@ import accord.primitives.AbstractRanges; import accord.primitives.Deps; import accord.primitives.FullKeyRoute; import accord.primitives.FullRangeRoute; +import accord.primitives.KeyDeps; import accord.primitives.Keys; import accord.primitives.PartialKeyRoute; import accord.primitives.PartialRangeRoute; import accord.primitives.PartialTxn; import accord.primitives.Range; +import accord.primitives.RangeDeps; import accord.primitives.Ranges; import accord.primitives.RoutingKeys; import accord.primitives.Seekables; import accord.primitives.Timestamp; -import accord.primitives.TxnId; import accord.primitives.Unseekables; import accord.primitives.Writes; import org.apache.cassandra.service.accord.api.PartitionKey; @@ -61,16 +60,16 @@ public class AccordObjectSizes return ((AccordRoutingKey) key).estimatedSizeOnHeap(); } - private static final long EMPTY_KEY_RANGE_SIZE = ObjectSizes.measure(TokenRange.fullRange("")); + private static final long EMPTY_RANGE_SIZE = ObjectSizes.measure(TokenRange.fullRange("")); public static long range(Range range) { - return EMPTY_KEY_RANGE_SIZE + key(range.start()) + key(range.end()); + return EMPTY_RANGE_SIZE + key(range.start()) + key(range.end()); } - private static final long EMPTY_KEY_RANGES_SIZE = ObjectSizes.measure(Ranges.of()); + private static final long EMPTY_RANGES_SIZE = ObjectSizes.measure(Ranges.of()); public static long ranges(Ranges ranges) { - long size = EMPTY_KEY_RANGES_SIZE; + long size = EMPTY_RANGES_SIZE; size += ObjectSizes.sizeOfReferenceArray(ranges.size()); // TODO: many ranges are fixed size, can compute by multiplication for (int i = 0, mi = ranges.size() ; i < mi ; i++) @@ -196,12 +195,22 @@ public class AccordObjectSizes private static final long EMPTY_DEPS_SIZE = ObjectSizes.measureDeep(Deps.NONE); public static long dependencies(Deps dependencies) { - long size = EMPTY_DEPS_SIZE; - for (Map.Entry<Key, TxnId> entry : dependencies) - { - size += key(entry.getKey()); - size += timestamp(entry.getValue()); - } + // TODO (expected): this doesn't measure the backing arrays, is inefficient; + // doesn't account for txnIdToKeys, txnIdToRanges, and searchable fields; + // fix to accunt for, in case caching isn't redone + long size = EMPTY_DEPS_SIZE - EMPTY_KEYS_SIZE - ObjectSizes.sizeOfReferenceArray(0); + size += keys(dependencies.keyDeps.keys()); + for (int i = 0 ; i < dependencies.rangeDeps.rangeCount() ; ++i) + size += range(dependencies.rangeDeps.range(i)); + size += ObjectSizes.sizeOfReferenceArray(dependencies.rangeDeps.rangeCount()); + + for (int i = 0 ; i < dependencies.keyDeps.txnIdCount() ; ++i) + size += timestamp(dependencies.keyDeps.txnId(i)); + for (int i = 0 ; i < dependencies.rangeDeps.txnIdCount() ; ++i) + size += timestamp(dependencies.rangeDeps.txnId(i)); + + size += KeyDeps.SerializerSupport.keysToTxnIdsCount(dependencies.keyDeps) * 4L; + size += RangeDeps.SerializerSupport.rangesToTxnIdsCount(dependencies.rangeDeps) * 4L; return size; } diff --git a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java index 854df207b7..5e4e806b25 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java @@ -15,42 +15,49 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.service.accord.serializers; import java.io.IOException; +import com.google.common.primitives.Ints; + import accord.primitives.Deps; +import accord.primitives.KeyDeps; import accord.primitives.Keys; import accord.primitives.PartialDeps; +import accord.primitives.Range; +import accord.primitives.RangeDeps; import accord.primitives.Ranges; import accord.primitives.TxnId; -import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.service.accord.TokenRange; -import static accord.primitives.Deps.SerializerSupport.keyToTxnId; -import static accord.primitives.Deps.SerializerSupport.keyToTxnIdCount; +import static accord.primitives.KeyDeps.SerializerSupport.keysToTxnIds; +import static accord.primitives.KeyDeps.SerializerSupport.keysToTxnIdsCount; +import static accord.primitives.RangeDeps.SerializerSupport.rangesToTxnIds; +import static accord.primitives.RangeDeps.SerializerSupport.rangesToTxnIdsCount; +import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt; public abstract class DepsSerializer<D extends Deps> implements IVersionedSerializer<D> { public static final DepsSerializer<Deps> deps = new DepsSerializer<Deps>() { @Override - Deps deserialize(Keys keys, TxnId[] txnIds, int[] keyToTxnIds, DataInputPlus in, int version) + Deps deserialize(KeyDeps keyDeps, RangeDeps rangeDeps, DataInputPlus in, int version) { - return Deps.SerializerSupport.create(keys, txnIds, keyToTxnIds); + return new Deps(keyDeps, rangeDeps); } }; public static final DepsSerializer<PartialDeps> partialDeps = new DepsSerializer<PartialDeps>() { @Override - PartialDeps deserialize(Keys keys, TxnId[] txnIds, int[] keyToTxnIds, DataInputPlus in, int version) throws IOException + PartialDeps deserialize(KeyDeps keyDeps, RangeDeps rangeDeps, DataInputPlus in, int version) throws IOException { Ranges covering = KeySerializers.ranges.deserialize(in, version); - return PartialDeps.SerializerSupport.create(covering, keys, txnIds, keyToTxnIds); + return new PartialDeps(covering, keyDeps, rangeDeps); } @Override @@ -63,61 +70,131 @@ public abstract class DepsSerializer<D extends Deps> implements IVersionedSerial @Override public long serializedSize(PartialDeps partialDeps, int version) { - return KeySerializers.ranges.serializedSize(partialDeps.covering, version) - + super.serializedSize(partialDeps, version); + return super.serializedSize(partialDeps, version) + + KeySerializers.ranges.serializedSize(partialDeps.covering, version); } }; @Override public void serialize(D deps, DataOutputPlus out, int version) throws IOException { - Keys keys = deps.keys(); - KeySerializers.keys.serialize(keys, out, version); - - int txnIdCount = deps.txnIdCount(); - out.writeUnsignedVInt32(txnIdCount); - for (int i=0; i<txnIdCount; i++) - CommandSerializers.txnId.serialize(deps.txnId(i), out, version); - - int keyToTxnIdCount = keyToTxnIdCount(deps); - out.writeUnsignedVInt32(keyToTxnIdCount); - for (int i=0; i<keyToTxnIdCount; i++) - out.writeUnsignedVInt32(keyToTxnId(deps, i)); + KeyDeps keyDeps = deps.keyDeps; + { + KeySerializers.keys.serialize(keyDeps.keys(), out, version); + + int txnIdCount = keyDeps.txnIdCount(); + out.writeUnsignedVInt32(txnIdCount); + for (int i = 0; i < txnIdCount; i++) + CommandSerializers.txnId.serialize(keyDeps.txnId(i), out, version); + + int keysToTxnIdsCount = keysToTxnIdsCount(keyDeps); + out.writeUnsignedVInt32(keysToTxnIdsCount); + for (int i = 0; i < keysToTxnIdsCount; i++) + out.writeUnsignedVInt32(keysToTxnIds(keyDeps, i)); + } + + RangeDeps rangeDeps = deps.rangeDeps; + { + int rangeCount = rangeDeps.rangeCount(); + out.writeUnsignedVInt32(rangeCount); + for (int i = 0; i < rangeCount; i++) + TokenRange.serializer.serialize((TokenRange) rangeDeps.range(i), out, version); + + int txnIdCount = rangeDeps.txnIdCount(); + out.writeUnsignedVInt32(txnIdCount); + for (int i = 0; i < txnIdCount; i++) + CommandSerializers.txnId.serialize(rangeDeps.txnId(i), out, version); + + int rangesToTxnIdsCount = rangesToTxnIdsCount(rangeDeps); + out.writeUnsignedVInt32(rangesToTxnIdsCount); + for (int i = 0; i < rangesToTxnIdsCount; i++) + out.writeUnsignedVInt32(rangesToTxnIds(rangeDeps, i)); + } } @Override public D deserialize(DataInputPlus in, int version) throws IOException { - Keys keys = KeySerializers.keys.deserialize(in, version); + KeyDeps keyDeps; + { + Keys keys = KeySerializers.keys.deserialize(in, version); + + int txnIdCount = in.readUnsignedVInt32(); + TxnId[] txnIds = new TxnId[txnIdCount]; + for (int i = 0; i < txnIdCount; i++) + txnIds[i] = CommandSerializers.txnId.deserialize(in, version); + + int keysToTxnIdsCount = in.readUnsignedVInt32(); + int[] keysToTxnIds = new int[keysToTxnIdsCount]; + for (int i = 0; i < keysToTxnIdsCount; i++) + keysToTxnIds[i] = in.readUnsignedVInt32(); - TxnId[] txnIds = new TxnId[in.readUnsignedVInt32()]; - for (int i=0; i<txnIds.length; i++) - txnIds[i] = CommandSerializers.txnId.deserialize(in, version); + keyDeps = KeyDeps.SerializerSupport.create(keys, txnIds, keysToTxnIds); + } - int[] keyToTxnIds = new int[in.readUnsignedVInt32()]; - for (int i=0; i<keyToTxnIds.length; i++) - keyToTxnIds[i] = in.readUnsignedVInt32(); + RangeDeps rangeDeps; + { + int rangeCount = Ints.checkedCast(in.readUnsignedVInt32()); + Range[] ranges = new Range[rangeCount]; + for (int i = 0; i < rangeCount; i++) + ranges[i] = TokenRange.serializer.deserialize(in, version); + + int txnIdCount = in.readUnsignedVInt32(); + TxnId[] txnIds = new TxnId[txnIdCount]; + for (int i = 0; i < txnIdCount; i++) + txnIds[i] = CommandSerializers.txnId.deserialize(in, version); + + int rangesToTxnIdsCount = in.readUnsignedVInt32(); + int[] rangesToTxnIds = new int[rangesToTxnIdsCount]; + for (int i = 0; i < rangesToTxnIdsCount; i++) + rangesToTxnIds[i] = in.readUnsignedVInt32(); + + rangeDeps = RangeDeps.SerializerSupport.create(ranges, txnIds, rangesToTxnIds); + } - return deserialize(keys, txnIds, keyToTxnIds, in, version); + return deserialize(keyDeps, rangeDeps, in, version); } - abstract D deserialize(Keys keys, TxnId[] txnIds, int[] keyToTxnIds, DataInputPlus in, int version) throws IOException; + abstract D deserialize(KeyDeps keyDeps, RangeDeps rangeDeps, DataInputPlus in, int version) throws IOException; @Override public long serializedSize(D deps, int version) { - Keys keys = deps.keys(); - long size = KeySerializers.keys.serializedSize(keys, version); - - int txnIdCount = deps.txnIdCount(); - size += TypeSizes.sizeofUnsignedVInt(txnIdCount); - for (int i=0; i<txnIdCount; i++) - size += CommandSerializers.txnId.serializedSize(deps.txnId(i), version); - - int keyToTxnIdCount = keyToTxnIdCount(deps); - size += TypeSizes.sizeofUnsignedVInt(keyToTxnIdCount); - for (int i=0; i<keyToTxnIdCount; i++) - size += TypeSizes.sizeofUnsignedVInt(keyToTxnId(deps, i)); + long size = 0L; + + KeyDeps keyDeps = deps.keyDeps; + { + size += KeySerializers.keys.serializedSize(keyDeps.keys(), version); + + int txnIdCount = keyDeps.txnIdCount(); + size += sizeofUnsignedVInt(txnIdCount); + for (int i = 0; i < txnIdCount; i++) + size += CommandSerializers.txnId.serializedSize(keyDeps.txnId(i), version); + + int keysToTxnIdsCount = keysToTxnIdsCount(keyDeps); + size += sizeofUnsignedVInt(keysToTxnIdsCount); + for (int i = 0; i < keysToTxnIdsCount; i++) + size += sizeofUnsignedVInt(keysToTxnIds(keyDeps, i)); + } + + RangeDeps rangeDeps = deps.rangeDeps; + { + int rangeCount = rangeDeps.rangeCount(); + size += sizeofUnsignedVInt(rangeCount); + for (int i = 0 ; i < rangeCount ; ++i) + size += TokenRange.serializer.serializedSize((TokenRange) rangeDeps.range(i), version); + + int txnIdCount = rangeDeps.txnIdCount(); + size += sizeofUnsignedVInt(txnIdCount); + for (int i = 0; i < txnIdCount; i++) + size += CommandSerializers.txnId.serializedSize(rangeDeps.txnId(i), version); + + int rangesToTxnIdsCount = rangesToTxnIdsCount(rangeDeps); + size += sizeofUnsignedVInt(rangesToTxnIdsCount); + for (int i = 0; i < rangesToTxnIdsCount; i++) + size += sizeofUnsignedVInt(rangesToTxnIds(rangeDeps, i)); + } + return size; } } diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java index 9f49138c8b..915bad76d7 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java @@ -81,11 +81,14 @@ public class AccordCommandStoreTest Key key = (Key)depTxn.keys().get(0); AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); - PartialDeps.OrderedBuilder builder = PartialDeps.orderedBuilder(depTxn.covering(), false); - builder.add(key, txnId(1, clock.incrementAndGet(), 1)); - PartialDeps dependencies = builder.build(); - QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, c, v) VALUES (0, 0, 1)"); + PartialDeps dependencies; + try (PartialDeps.Builder builder = PartialDeps.builder(depTxn.covering())) + { + builder.add(key, txnId(1, clock.incrementAndGet(), 1)); + dependencies = builder.build(); + } + QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, c, v) VALUES (0, 0, 1)"); TxnId oldTxnId1 = txnId(1, clock.incrementAndGet(), 1); TxnId oldTxnId2 = txnId(1, clock.incrementAndGet(), 1); TxnId oldTimestamp = txnId(1, clock.incrementAndGet(), 1); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java index 112d666550..1f6a3b2b6f 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java @@ -123,9 +123,12 @@ public class AccordCommandTest // check accept TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1); Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 1); - PartialDeps.OrderedBuilder builder = PartialDeps.orderedBuilder(route.covering(), false); - builder.add(key, txnId2); - PartialDeps deps = builder.build(); + PartialDeps deps; + try (PartialDeps.Builder builder = PartialDeps.builder(route.covering())) + { + builder.add(key, txnId2); + deps = builder.build(); + } Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1, false, Ballot.ZERO, executeAt, partialTxn.keys(), deps, partialTxn.kind()); commandStore.execute(accept, instance -> { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org