This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 208af4c658 Introduce RangeDeps
208af4c658 is described below
commit 208af4c658df3779fbca5d7826125d37424deb37
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Jan 9 13:19:16 2023 +0000
Introduce RangeDeps
Refactor Deps into KeyDeps and RangeDeps
patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-18173
---
.build/include-accord.sh | 2 +-
.../service/accord/AccordObjectSizes.java | 35 +++--
.../service/accord/serializers/DepsSerializer.java | 163 +++++++++++++++------
.../service/accord/AccordCommandStoreTest.java | 11 +-
.../service/accord/AccordCommandTest.java | 9 +-
5 files changed, 156 insertions(+), 64 deletions(-)
diff --git a/.build/include-accord.sh b/.build/include-accord.sh
index eabe4d204b..e7cedab46e 100755
--- a/.build/include-accord.sh
+++ b/.build/include-accord.sh
@@ -25,7 +25,7 @@ set -o nounset
bin="$(cd "$(dirname "$0")" > /dev/null; pwd)"
accord_repo='https://github.com/apache/cassandra-accord.git'
-accord_branch='5626c7c11400d4cf6d01a8e22517b53a83f5c512'
+accord_branch='686326eedc8d2553c98a30abc81a925be3942b8c'
accord_src="$bin/cassandra-accord"
checkout() {
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index 0246ee6780..ca18f67c75 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.service.accord;
-import java.util.Map;
-
import accord.api.Key;
import accord.api.RoutingKey;
import accord.local.Node;
@@ -28,16 +26,17 @@ import accord.primitives.AbstractRanges;
import accord.primitives.Deps;
import accord.primitives.FullKeyRoute;
import accord.primitives.FullRangeRoute;
+import accord.primitives.KeyDeps;
import accord.primitives.Keys;
import accord.primitives.PartialKeyRoute;
import accord.primitives.PartialRangeRoute;
import accord.primitives.PartialTxn;
import accord.primitives.Range;
+import accord.primitives.RangeDeps;
import accord.primitives.Ranges;
import accord.primitives.RoutingKeys;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
import accord.primitives.Unseekables;
import accord.primitives.Writes;
import org.apache.cassandra.service.accord.api.PartitionKey;
@@ -61,16 +60,16 @@ public class AccordObjectSizes
return ((AccordRoutingKey) key).estimatedSizeOnHeap();
}
- private static final long EMPTY_KEY_RANGE_SIZE =
ObjectSizes.measure(TokenRange.fullRange(""));
+ private static final long EMPTY_RANGE_SIZE =
ObjectSizes.measure(TokenRange.fullRange(""));
public static long range(Range range)
{
- return EMPTY_KEY_RANGE_SIZE + key(range.start()) + key(range.end());
+ return EMPTY_RANGE_SIZE + key(range.start()) + key(range.end());
}
- private static final long EMPTY_KEY_RANGES_SIZE =
ObjectSizes.measure(Ranges.of());
+ private static final long EMPTY_RANGES_SIZE =
ObjectSizes.measure(Ranges.of());
public static long ranges(Ranges ranges)
{
- long size = EMPTY_KEY_RANGES_SIZE;
+ long size = EMPTY_RANGES_SIZE;
size += ObjectSizes.sizeOfReferenceArray(ranges.size());
// TODO: many ranges are fixed size, can compute by multiplication
for (int i = 0, mi = ranges.size() ; i < mi ; i++)
@@ -196,12 +195,22 @@ public class AccordObjectSizes
private static final long EMPTY_DEPS_SIZE =
ObjectSizes.measureDeep(Deps.NONE);
public static long dependencies(Deps dependencies)
{
- long size = EMPTY_DEPS_SIZE;
- for (Map.Entry<Key, TxnId> entry : dependencies)
- {
- size += key(entry.getKey());
- size += timestamp(entry.getValue());
- }
+ // TODO (expected): this doesn't measure the backing arrays, is
inefficient;
+ // doesn't account for txnIdToKeys, txnIdToRanges, and searchable
fields;
+ // fix to accunt for, in case caching isn't redone
+ long size = EMPTY_DEPS_SIZE - EMPTY_KEYS_SIZE -
ObjectSizes.sizeOfReferenceArray(0);
+ size += keys(dependencies.keyDeps.keys());
+ for (int i = 0 ; i < dependencies.rangeDeps.rangeCount() ; ++i)
+ size += range(dependencies.rangeDeps.range(i));
+ size +=
ObjectSizes.sizeOfReferenceArray(dependencies.rangeDeps.rangeCount());
+
+ for (int i = 0 ; i < dependencies.keyDeps.txnIdCount() ; ++i)
+ size += timestamp(dependencies.keyDeps.txnId(i));
+ for (int i = 0 ; i < dependencies.rangeDeps.txnIdCount() ; ++i)
+ size += timestamp(dependencies.rangeDeps.txnId(i));
+
+ size +=
KeyDeps.SerializerSupport.keysToTxnIdsCount(dependencies.keyDeps) * 4L;
+ size +=
RangeDeps.SerializerSupport.rangesToTxnIdsCount(dependencies.rangeDeps) * 4L;
return size;
}
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
index 854df207b7..5e4e806b25 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
@@ -15,42 +15,49 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.service.accord.serializers;
import java.io.IOException;
+import com.google.common.primitives.Ints;
+
import accord.primitives.Deps;
+import accord.primitives.KeyDeps;
import accord.primitives.Keys;
import accord.primitives.PartialDeps;
+import accord.primitives.Range;
+import accord.primitives.RangeDeps;
import accord.primitives.Ranges;
import accord.primitives.TxnId;
-import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.accord.TokenRange;
-import static accord.primitives.Deps.SerializerSupport.keyToTxnId;
-import static accord.primitives.Deps.SerializerSupport.keyToTxnIdCount;
+import static accord.primitives.KeyDeps.SerializerSupport.keysToTxnIds;
+import static accord.primitives.KeyDeps.SerializerSupport.keysToTxnIdsCount;
+import static accord.primitives.RangeDeps.SerializerSupport.rangesToTxnIds;
+import static
accord.primitives.RangeDeps.SerializerSupport.rangesToTxnIdsCount;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
public abstract class DepsSerializer<D extends Deps> implements
IVersionedSerializer<D>
{
public static final DepsSerializer<Deps> deps = new DepsSerializer<Deps>()
{
@Override
- Deps deserialize(Keys keys, TxnId[] txnIds, int[] keyToTxnIds,
DataInputPlus in, int version)
+ Deps deserialize(KeyDeps keyDeps, RangeDeps rangeDeps, DataInputPlus
in, int version)
{
- return Deps.SerializerSupport.create(keys, txnIds, keyToTxnIds);
+ return new Deps(keyDeps, rangeDeps);
}
};
public static final DepsSerializer<PartialDeps> partialDeps = new
DepsSerializer<PartialDeps>()
{
@Override
- PartialDeps deserialize(Keys keys, TxnId[] txnIds, int[] keyToTxnIds,
DataInputPlus in, int version) throws IOException
+ PartialDeps deserialize(KeyDeps keyDeps, RangeDeps rangeDeps,
DataInputPlus in, int version) throws IOException
{
Ranges covering = KeySerializers.ranges.deserialize(in, version);
- return PartialDeps.SerializerSupport.create(covering, keys,
txnIds, keyToTxnIds);
+ return new PartialDeps(covering, keyDeps, rangeDeps);
}
@Override
@@ -63,61 +70,131 @@ public abstract class DepsSerializer<D extends Deps>
implements IVersionedSerial
@Override
public long serializedSize(PartialDeps partialDeps, int version)
{
- return KeySerializers.ranges.serializedSize(partialDeps.covering,
version)
- + super.serializedSize(partialDeps, version);
+ return super.serializedSize(partialDeps, version)
+ + KeySerializers.ranges.serializedSize(partialDeps.covering,
version);
}
};
@Override
public void serialize(D deps, DataOutputPlus out, int version) throws
IOException
{
- Keys keys = deps.keys();
- KeySerializers.keys.serialize(keys, out, version);
-
- int txnIdCount = deps.txnIdCount();
- out.writeUnsignedVInt32(txnIdCount);
- for (int i=0; i<txnIdCount; i++)
- CommandSerializers.txnId.serialize(deps.txnId(i), out, version);
-
- int keyToTxnIdCount = keyToTxnIdCount(deps);
- out.writeUnsignedVInt32(keyToTxnIdCount);
- for (int i=0; i<keyToTxnIdCount; i++)
- out.writeUnsignedVInt32(keyToTxnId(deps, i));
+ KeyDeps keyDeps = deps.keyDeps;
+ {
+ KeySerializers.keys.serialize(keyDeps.keys(), out, version);
+
+ int txnIdCount = keyDeps.txnIdCount();
+ out.writeUnsignedVInt32(txnIdCount);
+ for (int i = 0; i < txnIdCount; i++)
+ CommandSerializers.txnId.serialize(keyDeps.txnId(i), out,
version);
+
+ int keysToTxnIdsCount = keysToTxnIdsCount(keyDeps);
+ out.writeUnsignedVInt32(keysToTxnIdsCount);
+ for (int i = 0; i < keysToTxnIdsCount; i++)
+ out.writeUnsignedVInt32(keysToTxnIds(keyDeps, i));
+ }
+
+ RangeDeps rangeDeps = deps.rangeDeps;
+ {
+ int rangeCount = rangeDeps.rangeCount();
+ out.writeUnsignedVInt32(rangeCount);
+ for (int i = 0; i < rangeCount; i++)
+ TokenRange.serializer.serialize((TokenRange)
rangeDeps.range(i), out, version);
+
+ int txnIdCount = rangeDeps.txnIdCount();
+ out.writeUnsignedVInt32(txnIdCount);
+ for (int i = 0; i < txnIdCount; i++)
+ CommandSerializers.txnId.serialize(rangeDeps.txnId(i), out,
version);
+
+ int rangesToTxnIdsCount = rangesToTxnIdsCount(rangeDeps);
+ out.writeUnsignedVInt32(rangesToTxnIdsCount);
+ for (int i = 0; i < rangesToTxnIdsCount; i++)
+ out.writeUnsignedVInt32(rangesToTxnIds(rangeDeps, i));
+ }
}
@Override
public D deserialize(DataInputPlus in, int version) throws IOException
{
- Keys keys = KeySerializers.keys.deserialize(in, version);
+ KeyDeps keyDeps;
+ {
+ Keys keys = KeySerializers.keys.deserialize(in, version);
+
+ int txnIdCount = in.readUnsignedVInt32();
+ TxnId[] txnIds = new TxnId[txnIdCount];
+ for (int i = 0; i < txnIdCount; i++)
+ txnIds[i] = CommandSerializers.txnId.deserialize(in, version);
+
+ int keysToTxnIdsCount = in.readUnsignedVInt32();
+ int[] keysToTxnIds = new int[keysToTxnIdsCount];
+ for (int i = 0; i < keysToTxnIdsCount; i++)
+ keysToTxnIds[i] = in.readUnsignedVInt32();
- TxnId[] txnIds = new TxnId[in.readUnsignedVInt32()];
- for (int i=0; i<txnIds.length; i++)
- txnIds[i] = CommandSerializers.txnId.deserialize(in, version);
+ keyDeps = KeyDeps.SerializerSupport.create(keys, txnIds,
keysToTxnIds);
+ }
- int[] keyToTxnIds = new int[in.readUnsignedVInt32()];
- for (int i=0; i<keyToTxnIds.length; i++)
- keyToTxnIds[i] = in.readUnsignedVInt32();
+ RangeDeps rangeDeps;
+ {
+ int rangeCount = Ints.checkedCast(in.readUnsignedVInt32());
+ Range[] ranges = new Range[rangeCount];
+ for (int i = 0; i < rangeCount; i++)
+ ranges[i] = TokenRange.serializer.deserialize(in, version);
+
+ int txnIdCount = in.readUnsignedVInt32();
+ TxnId[] txnIds = new TxnId[txnIdCount];
+ for (int i = 0; i < txnIdCount; i++)
+ txnIds[i] = CommandSerializers.txnId.deserialize(in, version);
+
+ int rangesToTxnIdsCount = in.readUnsignedVInt32();
+ int[] rangesToTxnIds = new int[rangesToTxnIdsCount];
+ for (int i = 0; i < rangesToTxnIdsCount; i++)
+ rangesToTxnIds[i] = in.readUnsignedVInt32();
+
+ rangeDeps = RangeDeps.SerializerSupport.create(ranges, txnIds,
rangesToTxnIds);
+ }
- return deserialize(keys, txnIds, keyToTxnIds, in, version);
+ return deserialize(keyDeps, rangeDeps, in, version);
}
- abstract D deserialize(Keys keys, TxnId[] txnIds, int[] keyToTxnIds,
DataInputPlus in, int version) throws IOException;
+ abstract D deserialize(KeyDeps keyDeps, RangeDeps rangeDeps, DataInputPlus
in, int version) throws IOException;
@Override
public long serializedSize(D deps, int version)
{
- Keys keys = deps.keys();
- long size = KeySerializers.keys.serializedSize(keys, version);
-
- int txnIdCount = deps.txnIdCount();
- size += TypeSizes.sizeofUnsignedVInt(txnIdCount);
- for (int i=0; i<txnIdCount; i++)
- size += CommandSerializers.txnId.serializedSize(deps.txnId(i),
version);
-
- int keyToTxnIdCount = keyToTxnIdCount(deps);
- size += TypeSizes.sizeofUnsignedVInt(keyToTxnIdCount);
- for (int i=0; i<keyToTxnIdCount; i++)
- size += TypeSizes.sizeofUnsignedVInt(keyToTxnId(deps, i));
+ long size = 0L;
+
+ KeyDeps keyDeps = deps.keyDeps;
+ {
+ size += KeySerializers.keys.serializedSize(keyDeps.keys(),
version);
+
+ int txnIdCount = keyDeps.txnIdCount();
+ size += sizeofUnsignedVInt(txnIdCount);
+ for (int i = 0; i < txnIdCount; i++)
+ size +=
CommandSerializers.txnId.serializedSize(keyDeps.txnId(i), version);
+
+ int keysToTxnIdsCount = keysToTxnIdsCount(keyDeps);
+ size += sizeofUnsignedVInt(keysToTxnIdsCount);
+ for (int i = 0; i < keysToTxnIdsCount; i++)
+ size += sizeofUnsignedVInt(keysToTxnIds(keyDeps, i));
+ }
+
+ RangeDeps rangeDeps = deps.rangeDeps;
+ {
+ int rangeCount = rangeDeps.rangeCount();
+ size += sizeofUnsignedVInt(rangeCount);
+ for (int i = 0 ; i < rangeCount ; ++i)
+ size += TokenRange.serializer.serializedSize((TokenRange)
rangeDeps.range(i), version);
+
+ int txnIdCount = rangeDeps.txnIdCount();
+ size += sizeofUnsignedVInt(txnIdCount);
+ for (int i = 0; i < txnIdCount; i++)
+ size +=
CommandSerializers.txnId.serializedSize(rangeDeps.txnId(i), version);
+
+ int rangesToTxnIdsCount = rangesToTxnIdsCount(rangeDeps);
+ size += sizeofUnsignedVInt(rangesToTxnIdsCount);
+ for (int i = 0; i < rangesToTxnIdsCount; i++)
+ size += sizeofUnsignedVInt(rangesToTxnIds(rangeDeps, i));
+ }
+
return size;
}
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index 9f49138c8b..915bad76d7 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -81,11 +81,14 @@ public class AccordCommandStoreTest
Key key = (Key)depTxn.keys().get(0);
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
- PartialDeps.OrderedBuilder builder =
PartialDeps.orderedBuilder(depTxn.covering(), false);
- builder.add(key, txnId(1, clock.incrementAndGet(), 1));
- PartialDeps dependencies = builder.build();
- QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, c, v) VALUES
(0, 0, 1)");
+ PartialDeps dependencies;
+ try (PartialDeps.Builder builder =
PartialDeps.builder(depTxn.covering()))
+ {
+ builder.add(key, txnId(1, clock.incrementAndGet(), 1));
+ dependencies = builder.build();
+ }
+ QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, c, v) VALUES
(0, 0, 1)");
TxnId oldTxnId1 = txnId(1, clock.incrementAndGet(), 1);
TxnId oldTxnId2 = txnId(1, clock.incrementAndGet(), 1);
TxnId oldTimestamp = txnId(1, clock.incrementAndGet(), 1);
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index 112d666550..1f6a3b2b6f 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -123,9 +123,12 @@ public class AccordCommandTest
// check accept
TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1);
Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 1);
- PartialDeps.OrderedBuilder builder =
PartialDeps.orderedBuilder(route.covering(), false);
- builder.add(key, txnId2);
- PartialDeps deps = builder.build();
+ PartialDeps deps;
+ try (PartialDeps.Builder builder =
PartialDeps.builder(route.covering()))
+ {
+ builder.add(key, txnId2);
+ deps = builder.build();
+ }
Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1,
false, Ballot.ZERO, executeAt, partialTxn.keys(), deps, partialTxn.kind());
commandStore.execute(accept, instance -> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]