This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 34232d7bd4 IndexOutOfBoundsException while serializing CommandsForKey
34232d7bd4 is described below
commit 34232d7bd45761a1c14c7e91d2f8e5ae183bc8e3
Author: David Capwell <[email protected]>
AuthorDate: Fri May 17 13:50:01 2024 -0700
IndexOutOfBoundsException while serializing CommandsForKey
patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19642
---
modules/accord | 2 +-
.../serializers/CommandsForKeySerializer.java | 29 ++++++++++--------
.../serializers/CommandsForKeySerializerTest.java | 35 ++++++++++++++++++++--
3 files changed, 50 insertions(+), 16 deletions(-)
diff --git a/modules/accord b/modules/accord
index d63d06aafe..21cdaf5d28 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit d63d06aafe2e60e57a9651ff6dd491175bbe6916
+Subproject commit 21cdaf5d280965cfdc690d385375635b498bc9f9
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
index dbe2f4845f..a81b62b4a3 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
@@ -385,15 +385,18 @@ public class CommandsForKeySerializer
VIntCoding.writeUnsignedVInt32(unmanagedPendingCommitCount, out);
VIntCoding.writeUnsignedVInt32(cfk.unmanagedCount() -
unmanagedPendingCommitCount, out);
Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ?
Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT;
- for (int i = 0 ; i < cfk.unmanagedCount() ; ++i)
{
- Unmanaged unmanaged = cfk.getUnmanaged(i);
- Invariants.checkState(unmanaged.pending == pending);
- CommandSerializers.txnId.serialize(unmanaged.txnId, out,
ByteBufferAccessor.instance, out.position());
- out.position(out.position() +
CommandSerializers.txnId.serializedSize());
- CommandSerializers.timestamp.serialize(unmanaged.waitingUntil,
out, ByteBufferAccessor.instance, out.position());
- out.position(out.position() +
CommandSerializers.timestamp.serializedSize());
- if (--unmanagedPendingCommitCount == 0) pending =
Unmanaged.Pending.APPLY;
+ int offset = 0;
+ for (int i = 0 ; i < cfk.unmanagedCount() ; ++i)
+ {
+ Unmanaged unmanaged = cfk.getUnmanaged(i);
+ Invariants.checkState(unmanaged.pending == pending);
+
+ offset +=
CommandSerializers.txnId.serialize(unmanaged.txnId, out,
ByteBufferAccessor.instance, offset);
+ offset +=
CommandSerializers.timestamp.serialize(unmanaged.waitingUntil, out,
ByteBufferAccessor.instance, offset);
+ if (--unmanagedPendingCommitCount == 0) pending =
Unmanaged.Pending.APPLY;
+ }
+ out.position(out.position() + offset);
}
if ((executeAtCount | missingIdCount) > 0)
@@ -610,15 +613,17 @@ public class CommandsForKeySerializer
{
unmanageds = new Unmanaged[unmanagedCount];
Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ?
Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT;
+ int offset = 0;
for (int i = 0 ; i < unmanagedCount ; ++i)
{
- TxnId txnId = CommandSerializers.txnId.deserialize(in,
ByteBufferAccessor.instance, in.position());
- in.position(in.position() +
CommandSerializers.txnId.serializedSize());
- Timestamp waitingUntil =
CommandSerializers.timestamp.deserialize(in, ByteBufferAccessor.instance,
in.position());
- in.position(in.position() +
CommandSerializers.timestamp.serializedSize());
+ TxnId txnId = CommandSerializers.txnId.deserialize(in,
ByteBufferAccessor.instance, offset);
+ offset += CommandSerializers.txnId.serializedSize();
+ Timestamp waitingUntil =
CommandSerializers.timestamp.deserialize(in, ByteBufferAccessor.instance,
offset);
+ offset += CommandSerializers.timestamp.serializedSize();
unmanageds[i] = new Unmanaged(pending, txnId, waitingUntil);
if (--unmanagedPendingCommitCount == 0) pending =
Unmanaged.Pending.APPLY;
}
+ in.position(in.position() + offset);
}
if (executeAtMasks + missingDepsMasks > 0)
diff --git
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 405f92dc59..202dc5cb78 100644
---
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -33,6 +33,7 @@ import java.util.function.IntSupplier;
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
+import org.apache.commons.lang3.ArrayUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -42,6 +43,7 @@ import accord.local.CommandsForKey;
import accord.local.CommandsForKey.InternalStatus;
import accord.local.Command;
import accord.local.CommandsForKey.TxnInfo;
+import accord.local.CommandsForKey.Unmanaged;
import accord.local.CommonAttributes;
import accord.local.CommonAttributes.Mutable;
import accord.local.Listeners;
@@ -59,6 +61,7 @@ import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.utils.AccordGens;
+import accord.utils.Gen;
import accord.utils.Gens;
import accord.utils.RandomSource;
import accord.utils.SortedArrays;
@@ -465,12 +468,38 @@ public class CommandsForKeySerializerTest
next = txnIdGen.next(rs0);
return next;
}).unique().ofSizeBetween(0, 10).next(rs);
+ Arrays.sort(ids, Comparator.naturalOrder());
TxnInfo[] info = new TxnInfo[ids.length];
for (int i = 0; i < info.length; i++)
info[i] = TxnInfo.create(ids[i],
rs.pick(InternalStatus.values()), ids[i], CommandsForKey.NO_TXNIDS);
- Arrays.sort(info, Comparator.naturalOrder());
- CommandsForKey expected =
CommandsForKey.SerializerSupport.create(pk, info,
CommandsForKey.NO_PENDING_UNMANAGED);
+ Gen<Unmanaged.Pending> pendingGen =
Gens.enums().allMixedDistribution(Unmanaged.Pending.class).next(rs);
+
+ Unmanaged[] unmanaged = Gens.lists(txnIdGen)
+ .unique()
+ .ofSizeBetween(0, 10)
+ .map((rs0, txnIds) ->
txnIds.stream().map(i -> new Unmanaged(pendingGen.next(rs0), i,
i)).toArray(Unmanaged[]::new))
+ .next(rs);
+ Arrays.sort(unmanaged, Comparator.naturalOrder());
+ if (unmanaged.length > 0)
+ {
+ // when registering unmanaged, if the txn is "missing" in
TxnInfo we add it
+ List<TxnInfo> missing = new ArrayList<>(unmanaged.length);
+ for (Unmanaged u : unmanaged)
+ {
+ int idx = Arrays.binarySearch(ids, u.txnId);
+ if (idx < 0)
+ missing.add(TxnInfo.create(u.txnId,
InternalStatus.TRANSITIVELY_KNOWN));
+ }
+ if (!missing.isEmpty())
+ {
+ info = ArrayUtils.addAll(info,
missing.toArray(TxnInfo[]::new));
+ Arrays.sort(info, Comparator.naturalOrder());
+ }
+ }
+ else unmanaged = CommandsForKey.NO_PENDING_UNMANAGED;
+
+ CommandsForKey expected =
CommandsForKey.SerializerSupport.create(pk, info, unmanaged);
ByteBuffer buffer =
CommandsForKeySerializer.toBytesWithoutKey(expected);
CommandsForKey roundTrip = CommandsForKeySerializer.fromBytes(pk,
buffer);
@@ -493,4 +522,4 @@ public class CommandsForKeySerializerTest
CommandsForKey roundTrip = CommandsForKeySerializer.fromBytes(pk,
buffer);
Assert.assertEquals(expected, roundTrip);
}
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]