This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 208af4c658 Introduce RangeDeps
208af4c658 is described below

commit 208af4c658df3779fbca5d7826125d37424deb37
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Mon Jan 9 13:19:16 2023 +0000

    Introduce RangeDeps
    
    Refactor Deps into KeyDeps and RangeDeps
    
    patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-18173
---
 .build/include-accord.sh                           |   2 +-
 .../service/accord/AccordObjectSizes.java          |  35 +++--
 .../service/accord/serializers/DepsSerializer.java | 163 +++++++++++++++------
 .../service/accord/AccordCommandStoreTest.java     |  11 +-
 .../service/accord/AccordCommandTest.java          |   9 +-
 5 files changed, 156 insertions(+), 64 deletions(-)

diff --git a/.build/include-accord.sh b/.build/include-accord.sh
index eabe4d204b..e7cedab46e 100755
--- a/.build/include-accord.sh
+++ b/.build/include-accord.sh
@@ -25,7 +25,7 @@ set -o nounset
 bin="$(cd "$(dirname "$0")" > /dev/null; pwd)"
 
 accord_repo='https://github.com/apache/cassandra-accord.git'
-accord_branch='5626c7c11400d4cf6d01a8e22517b53a83f5c512'
+accord_branch='686326eedc8d2553c98a30abc81a925be3942b8c'
 accord_src="$bin/cassandra-accord"
 
 checkout() {
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java 
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index 0246ee6780..ca18f67c75 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -18,8 +18,6 @@
 
 package org.apache.cassandra.service.accord;
 
-import java.util.Map;
-
 import accord.api.Key;
 import accord.api.RoutingKey;
 import accord.local.Node;
@@ -28,16 +26,17 @@ import accord.primitives.AbstractRanges;
 import accord.primitives.Deps;
 import accord.primitives.FullKeyRoute;
 import accord.primitives.FullRangeRoute;
+import accord.primitives.KeyDeps;
 import accord.primitives.Keys;
 import accord.primitives.PartialKeyRoute;
 import accord.primitives.PartialRangeRoute;
 import accord.primitives.PartialTxn;
 import accord.primitives.Range;
+import accord.primitives.RangeDeps;
 import accord.primitives.Ranges;
 import accord.primitives.RoutingKeys;
 import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
 import accord.primitives.Unseekables;
 import accord.primitives.Writes;
 import org.apache.cassandra.service.accord.api.PartitionKey;
@@ -61,16 +60,16 @@ public class AccordObjectSizes
         return ((AccordRoutingKey) key).estimatedSizeOnHeap();
     }
 
-    private static final long EMPTY_KEY_RANGE_SIZE = 
ObjectSizes.measure(TokenRange.fullRange(""));
+    private static final long EMPTY_RANGE_SIZE = 
ObjectSizes.measure(TokenRange.fullRange(""));
     public static long range(Range range)
     {
-        return EMPTY_KEY_RANGE_SIZE + key(range.start()) + key(range.end());
+        return EMPTY_RANGE_SIZE + key(range.start()) + key(range.end());
     }
 
-    private static final long EMPTY_KEY_RANGES_SIZE = 
ObjectSizes.measure(Ranges.of());
+    private static final long EMPTY_RANGES_SIZE = 
ObjectSizes.measure(Ranges.of());
     public static long ranges(Ranges ranges)
     {
-        long size = EMPTY_KEY_RANGES_SIZE;
+        long size = EMPTY_RANGES_SIZE;
         size += ObjectSizes.sizeOfReferenceArray(ranges.size());
         // TODO: many ranges are fixed size, can compute by multiplication
         for (int i = 0, mi = ranges.size() ; i < mi ; i++)
@@ -196,12 +195,22 @@ public class AccordObjectSizes
     private static final long EMPTY_DEPS_SIZE = 
ObjectSizes.measureDeep(Deps.NONE);
     public static long dependencies(Deps dependencies)
     {
-        long size = EMPTY_DEPS_SIZE;
-        for (Map.Entry<Key, TxnId> entry : dependencies)
-        {
-            size += key(entry.getKey());
-            size += timestamp(entry.getValue());
-        }
+        // TODO (expected): this doesn't measure the backing arrays, is 
inefficient;
+        //      doesn't account for txnIdToKeys, txnIdToRanges, and searchable 
fields;
+        //      fix to accunt for, in case caching isn't redone
+        long size = EMPTY_DEPS_SIZE - EMPTY_KEYS_SIZE - 
ObjectSizes.sizeOfReferenceArray(0);
+        size += keys(dependencies.keyDeps.keys());
+        for (int i = 0 ; i < dependencies.rangeDeps.rangeCount() ; ++i)
+            size += range(dependencies.rangeDeps.range(i));
+        size += 
ObjectSizes.sizeOfReferenceArray(dependencies.rangeDeps.rangeCount());
+
+        for (int i = 0 ; i < dependencies.keyDeps.txnIdCount() ; ++i)
+            size += timestamp(dependencies.keyDeps.txnId(i));
+        for (int i = 0 ; i < dependencies.rangeDeps.txnIdCount() ; ++i)
+            size += timestamp(dependencies.rangeDeps.txnId(i));
+
+        size += 
KeyDeps.SerializerSupport.keysToTxnIdsCount(dependencies.keyDeps) * 4L;
+        size += 
RangeDeps.SerializerSupport.rangesToTxnIdsCount(dependencies.rangeDeps) * 4L;
         return size;
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java 
b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
index 854df207b7..5e4e806b25 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
@@ -15,42 +15,49 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.service.accord.serializers;
 
 import java.io.IOException;
 
+import com.google.common.primitives.Ints;
+
 import accord.primitives.Deps;
+import accord.primitives.KeyDeps;
 import accord.primitives.Keys;
 import accord.primitives.PartialDeps;
+import accord.primitives.Range;
+import accord.primitives.RangeDeps;
 import accord.primitives.Ranges;
 import accord.primitives.TxnId;
-import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.accord.TokenRange;
 
-import static accord.primitives.Deps.SerializerSupport.keyToTxnId;
-import static accord.primitives.Deps.SerializerSupport.keyToTxnIdCount;
+import static accord.primitives.KeyDeps.SerializerSupport.keysToTxnIds;
+import static accord.primitives.KeyDeps.SerializerSupport.keysToTxnIdsCount;
+import static accord.primitives.RangeDeps.SerializerSupport.rangesToTxnIds;
+import static 
accord.primitives.RangeDeps.SerializerSupport.rangesToTxnIdsCount;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
 
 public abstract class DepsSerializer<D extends Deps> implements 
IVersionedSerializer<D>
 {
     public static final DepsSerializer<Deps> deps = new DepsSerializer<Deps>()
     {
         @Override
-        Deps deserialize(Keys keys, TxnId[] txnIds, int[] keyToTxnIds, 
DataInputPlus in, int version)
+        Deps deserialize(KeyDeps keyDeps, RangeDeps rangeDeps, DataInputPlus 
in, int version)
         {
-            return Deps.SerializerSupport.create(keys, txnIds, keyToTxnIds);
+            return new Deps(keyDeps, rangeDeps);
         }
     };
 
     public static final DepsSerializer<PartialDeps> partialDeps = new 
DepsSerializer<PartialDeps>()
     {
         @Override
-        PartialDeps deserialize(Keys keys, TxnId[] txnIds, int[] keyToTxnIds, 
DataInputPlus in, int version) throws IOException
+        PartialDeps deserialize(KeyDeps keyDeps, RangeDeps rangeDeps, 
DataInputPlus in, int version) throws IOException
         {
             Ranges covering = KeySerializers.ranges.deserialize(in, version);
-            return PartialDeps.SerializerSupport.create(covering, keys, 
txnIds, keyToTxnIds);
+            return new PartialDeps(covering, keyDeps, rangeDeps);
         }
 
         @Override
@@ -63,61 +70,131 @@ public abstract class DepsSerializer<D extends Deps> 
implements IVersionedSerial
         @Override
         public long serializedSize(PartialDeps partialDeps, int version)
         {
-            return KeySerializers.ranges.serializedSize(partialDeps.covering, 
version)
-                   + super.serializedSize(partialDeps, version);
+            return super.serializedSize(partialDeps, version)
+                 + KeySerializers.ranges.serializedSize(partialDeps.covering, 
version);
         }
     };
 
     @Override
     public void serialize(D deps, DataOutputPlus out, int version) throws 
IOException
     {
-        Keys keys = deps.keys();
-        KeySerializers.keys.serialize(keys, out, version);
-
-        int txnIdCount = deps.txnIdCount();
-        out.writeUnsignedVInt32(txnIdCount);
-        for (int i=0; i<txnIdCount; i++)
-            CommandSerializers.txnId.serialize(deps.txnId(i), out, version);
-
-        int keyToTxnIdCount = keyToTxnIdCount(deps);
-        out.writeUnsignedVInt32(keyToTxnIdCount);
-        for (int i=0; i<keyToTxnIdCount; i++)
-            out.writeUnsignedVInt32(keyToTxnId(deps, i));
+        KeyDeps keyDeps = deps.keyDeps;
+        {
+            KeySerializers.keys.serialize(keyDeps.keys(), out, version);
+
+            int txnIdCount = keyDeps.txnIdCount();
+            out.writeUnsignedVInt32(txnIdCount);
+            for (int i = 0; i < txnIdCount; i++)
+                CommandSerializers.txnId.serialize(keyDeps.txnId(i), out, 
version);
+
+            int keysToTxnIdsCount = keysToTxnIdsCount(keyDeps);
+            out.writeUnsignedVInt32(keysToTxnIdsCount);
+            for (int i = 0; i < keysToTxnIdsCount; i++)
+                out.writeUnsignedVInt32(keysToTxnIds(keyDeps, i));
+        }
+
+        RangeDeps rangeDeps = deps.rangeDeps;
+        {
+            int rangeCount = rangeDeps.rangeCount();
+            out.writeUnsignedVInt32(rangeCount);
+            for (int i = 0; i < rangeCount; i++)
+                TokenRange.serializer.serialize((TokenRange) 
rangeDeps.range(i), out, version);
+
+            int txnIdCount = rangeDeps.txnIdCount();
+            out.writeUnsignedVInt32(txnIdCount);
+            for (int i = 0; i < txnIdCount; i++)
+                CommandSerializers.txnId.serialize(rangeDeps.txnId(i), out, 
version);
+
+            int rangesToTxnIdsCount = rangesToTxnIdsCount(rangeDeps);
+            out.writeUnsignedVInt32(rangesToTxnIdsCount);
+            for (int i = 0; i < rangesToTxnIdsCount; i++)
+                out.writeUnsignedVInt32(rangesToTxnIds(rangeDeps, i));
+        }
     }
 
     @Override
     public D deserialize(DataInputPlus in, int version) throws IOException
     {
-        Keys keys = KeySerializers.keys.deserialize(in, version);
+        KeyDeps keyDeps;
+        {
+            Keys keys = KeySerializers.keys.deserialize(in, version);
+
+            int txnIdCount = in.readUnsignedVInt32();
+            TxnId[] txnIds = new TxnId[txnIdCount];
+            for (int i = 0; i < txnIdCount; i++)
+                txnIds[i] = CommandSerializers.txnId.deserialize(in, version);
+
+            int keysToTxnIdsCount = in.readUnsignedVInt32();
+            int[] keysToTxnIds = new int[keysToTxnIdsCount];
+            for (int i = 0; i < keysToTxnIdsCount; i++)
+                keysToTxnIds[i] = in.readUnsignedVInt32();
 
-        TxnId[] txnIds = new TxnId[in.readUnsignedVInt32()];
-        for (int i=0; i<txnIds.length; i++)
-            txnIds[i] = CommandSerializers.txnId.deserialize(in, version);
+            keyDeps = KeyDeps.SerializerSupport.create(keys, txnIds, 
keysToTxnIds);
+        }
 
-        int[] keyToTxnIds = new int[in.readUnsignedVInt32()];
-        for (int i=0; i<keyToTxnIds.length; i++)
-            keyToTxnIds[i] = in.readUnsignedVInt32();
+        RangeDeps rangeDeps;
+        {
+            int rangeCount = Ints.checkedCast(in.readUnsignedVInt32());
+            Range[] ranges = new Range[rangeCount];
+            for (int i = 0; i < rangeCount; i++)
+                ranges[i] = TokenRange.serializer.deserialize(in, version);
+
+            int txnIdCount = in.readUnsignedVInt32();
+            TxnId[] txnIds = new TxnId[txnIdCount];
+            for (int i = 0; i < txnIdCount; i++)
+                txnIds[i] = CommandSerializers.txnId.deserialize(in, version);
+
+            int rangesToTxnIdsCount = in.readUnsignedVInt32();
+            int[] rangesToTxnIds = new int[rangesToTxnIdsCount];
+            for (int i = 0; i < rangesToTxnIdsCount; i++)
+                rangesToTxnIds[i] = in.readUnsignedVInt32();
+
+            rangeDeps = RangeDeps.SerializerSupport.create(ranges, txnIds, 
rangesToTxnIds);
+        }
 
-        return deserialize(keys, txnIds, keyToTxnIds, in, version);
+        return deserialize(keyDeps, rangeDeps, in, version);
     }
 
-    abstract D deserialize(Keys keys, TxnId[] txnIds, int[] keyToTxnIds, 
DataInputPlus in, int version) throws IOException;
+    abstract D deserialize(KeyDeps keyDeps, RangeDeps rangeDeps, DataInputPlus 
in, int version) throws IOException;
 
     @Override
     public long serializedSize(D deps, int version)
     {
-        Keys keys = deps.keys();
-        long size = KeySerializers.keys.serializedSize(keys, version);
-
-        int txnIdCount = deps.txnIdCount();
-        size += TypeSizes.sizeofUnsignedVInt(txnIdCount);
-        for (int i=0; i<txnIdCount; i++)
-            size += CommandSerializers.txnId.serializedSize(deps.txnId(i), 
version);
-
-        int keyToTxnIdCount = keyToTxnIdCount(deps);
-        size += TypeSizes.sizeofUnsignedVInt(keyToTxnIdCount);
-        for (int i=0; i<keyToTxnIdCount; i++)
-            size += TypeSizes.sizeofUnsignedVInt(keyToTxnId(deps, i));
+        long size = 0L;
+
+        KeyDeps keyDeps = deps.keyDeps;
+        {
+            size += KeySerializers.keys.serializedSize(keyDeps.keys(), 
version);
+
+            int txnIdCount = keyDeps.txnIdCount();
+            size += sizeofUnsignedVInt(txnIdCount);
+            for (int i = 0; i < txnIdCount; i++)
+                size += 
CommandSerializers.txnId.serializedSize(keyDeps.txnId(i), version);
+
+            int keysToTxnIdsCount = keysToTxnIdsCount(keyDeps);
+            size += sizeofUnsignedVInt(keysToTxnIdsCount);
+            for (int i = 0; i < keysToTxnIdsCount; i++)
+                size += sizeofUnsignedVInt(keysToTxnIds(keyDeps, i));
+        }
+
+        RangeDeps rangeDeps = deps.rangeDeps;
+        {
+            int rangeCount = rangeDeps.rangeCount();
+            size += sizeofUnsignedVInt(rangeCount);
+            for (int i = 0 ; i < rangeCount ; ++i)
+                size += TokenRange.serializer.serializedSize((TokenRange) 
rangeDeps.range(i), version);
+
+            int txnIdCount = rangeDeps.txnIdCount();
+            size += sizeofUnsignedVInt(txnIdCount);
+            for (int i = 0; i < txnIdCount; i++)
+                size += 
CommandSerializers.txnId.serializedSize(rangeDeps.txnId(i), version);
+
+            int rangesToTxnIdsCount = rangesToTxnIdsCount(rangeDeps);
+            size += sizeofUnsignedVInt(rangesToTxnIdsCount);
+            for (int i = 0; i < rangesToTxnIdsCount; i++)
+                size += sizeofUnsignedVInt(rangesToTxnIds(rangeDeps, i));
+        }
+
         return size;
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index 9f49138c8b..915bad76d7 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -81,11 +81,14 @@ public class AccordCommandStoreTest
         Key key = (Key)depTxn.keys().get(0);
         AccordCommandStore commandStore = 
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
 
-        PartialDeps.OrderedBuilder builder = 
PartialDeps.orderedBuilder(depTxn.covering(), false);
-        builder.add(key, txnId(1, clock.incrementAndGet(), 1));
-        PartialDeps dependencies = builder.build();
-        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, c, v) VALUES 
(0, 0, 1)");
+        PartialDeps dependencies;
+        try (PartialDeps.Builder builder = 
PartialDeps.builder(depTxn.covering()))
+        {
+            builder.add(key, txnId(1, clock.incrementAndGet(), 1));
+            dependencies = builder.build();
+        }
 
+        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, c, v) VALUES 
(0, 0, 1)");
         TxnId oldTxnId1 = txnId(1, clock.incrementAndGet(), 1);
         TxnId oldTxnId2 = txnId(1, clock.incrementAndGet(), 1);
         TxnId oldTimestamp = txnId(1, clock.incrementAndGet(), 1);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index 112d666550..1f6a3b2b6f 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -123,9 +123,12 @@ public class AccordCommandTest
         // check accept
         TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1);
         Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 1);
-        PartialDeps.OrderedBuilder builder = 
PartialDeps.orderedBuilder(route.covering(), false);
-        builder.add(key, txnId2);
-        PartialDeps deps = builder.build();
+        PartialDeps deps;
+        try (PartialDeps.Builder builder = 
PartialDeps.builder(route.covering()))
+        {
+            builder.add(key, txnId2);
+            deps = builder.build();
+        }
         Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1, 
false, Ballot.ZERO, executeAt, partialTxn.keys(), deps, partialTxn.kind());
 
         commandStore.execute(accept, instance -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to