This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new f43d9d0f7f Add support in the binary protocol to allow transactions to
have multiple conditions
f43d9d0f7f is described below
commit f43d9d0f7f090bcd3679097a453ff0ae2b33ee8e
Author: David Capwell <[email protected]>
AuthorDate: Mon Nov 10 12:06:42 2025 -0800
Add support in the binary protocol to allow transactions to have multiple
conditions
patch by David Capwell; reviewed by Benedict Elliott Smith for
CASSANDRA-20883
---
CHANGES.txt | 1 +
...ymmetricParameterisedUnversionedSerializer.java | 5 +
.../cassandra/service/accord/TokenRange.java | 5 +
.../accord/serializers/SerializePacked.java | 83 ++-
.../service/accord/serializers/TableMetadatas.java | 11 +
.../cassandra/service/accord/txn/AccordUpdate.java | 2 +-
.../cassandra/service/accord/txn/TxnQuery.java | 2 +-
.../cassandra/service/accord/txn/TxnUpdate.java | 789 ++++++++++++++++-----
.../cassandra/service/accord/txn/TxnWrite.java | 59 +-
.../apache/cassandra/utils/ArraySerializers.java | 32 +
.../cassandra/utils/CollectionSerializers.java | 2 +-
test/unit/org/apache/cassandra/io/Serializers.java | 10 +-
.../serializers/CommandsForKeySerializerTest.java | 3 +-
.../accord/serializers/SerializePackedTest.java | 169 +++++
.../service/accord/txn/TxnUpdateTest.java | 239 +++++++
.../apache/cassandra/utils/AccordGenerators.java | 7 +-
.../cassandra/utils/LargeBitSetSerializerTest.java | 59 --
.../utils/SimpleBitSetSerializersTest.java | 113 +++
18 files changed, 1333 insertions(+), 258 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index b4996c26ba..332107e491 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Add support in the binary protocol to allow transactions to have multiple
conditions (CASSANDRA-20883)
* Enable CQLSSTableWriter to create SSTables compressed with a dictionary
(CASSANDRA-20938)
* Support ZSTD dictionary compression (CASSANDRA-17021)
* Fix ExceptionsTable when stacktrace has zero elements (CASSANDRA-20992)
diff --git
a/src/java/org/apache/cassandra/io/AsymmetricParameterisedUnversionedSerializer.java
b/src/java/org/apache/cassandra/io/AsymmetricParameterisedUnversionedSerializer.java
index 69b1bd2e0c..56841a0bef 100644
---
a/src/java/org/apache/cassandra/io/AsymmetricParameterisedUnversionedSerializer.java
+++
b/src/java/org/apache/cassandra/io/AsymmetricParameterisedUnversionedSerializer.java
@@ -74,5 +74,10 @@ public interface
AsymmetricParameterisedUnversionedSerializer<In, P, Out>
}
}
+ default void skip(P p, DataInputPlus in) throws IOException
+ {
+ deserialize(p, in);
+ }
+
long serializedSize(In t, P p);
}
diff --git a/src/java/org/apache/cassandra/service/accord/TokenRange.java
b/src/java/org/apache/cassandra/service/accord/TokenRange.java
index 91be38ec2f..9c00c4dcdf 100644
--- a/src/java/org/apache/cassandra/service/accord/TokenRange.java
+++ b/src/java/org/apache/cassandra/service/accord/TokenRange.java
@@ -54,6 +54,11 @@ public class TokenRange extends Range.EndInclusive
return new TokenRange(start, end);
}
+ public static TokenRange create(TableId tableId, Token start, Token end)
+ {
+ return new TokenRange(new TokenKey(tableId, start), new
TokenKey(tableId, end));
+ }
+
public static TokenRange createUnsafe(TokenKey start, TokenKey end)
{
return new TokenRange(start, end);
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/SerializePacked.java
b/src/java/org/apache/cassandra/service/accord/serializers/SerializePacked.java
index b7262c1265..67d66a7185 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/SerializePacked.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/SerializePacked.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import accord.utils.BitUtils;
import accord.utils.Invariants;
import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -30,20 +31,91 @@ import org.apache.cassandra.io.util.DataOutputPlus;
* A set of simple utilities to quickly serialize/deserialize arrays/lists of
values that each require <= 64 bits to represent.
* These are packed into an "array" of fixed bit width, so that the total size
consumed is ceil((bits*elements)/8).
* This can (in future) be read directly without deserialization, by indexing
into the byte stream directly.
+ * <p/>
+ * The serialized value is optimized for values in the range 0 to 256
(negative will be rejected), and should produce
+ * output smaller or equal to vint serialization; when values are larger than
256, then the packing can produce 1 extra
+ * serialized byte. Serialization is safe in these cases, and faster to skip.
*/
public class SerializePacked
{
- public static void serializePackedInts(int[] vs, int from, int to, int
max, DataOutputPlus out) throws IOException
+ public static void serializePackedSortedIntsAndLength(int[] vs,
DataOutputPlus out) throws IOException
+ {
+ out.writeUnsignedVInt32(vs.length);
+ serializePackedSortedInts(vs, out);
+ }
+
+ public static void serializePackedSortedInts(int[] vs, DataOutputPlus out)
throws IOException
+ {
+ if (vs.length == 0)
+ return;
+
+ int last = vs[vs.length - 1];
+ Invariants.require(last >= 0,
+ () -> String.format("Found a negative value at offset %d;
value %d", (Object) (vs.length - 1), (Object) last));
+ out.writeUnsignedVInt32(last);
+ serializePackedInts(vs, 0, vs.length - 1, last, out);
+ }
+
+ public static int[] deserializePackedSortedIntsAndLength(DataInputPlus in)
throws IOException
+ {
+ return deserializePackedSortedInts(in.readUnsignedVInt32(), in);
+ }
+
+ public static int[] deserializePackedSortedInts(int length, DataInputPlus
in) throws IOException
+ {
+ if (length == 0)
+ return new int[0];
+
+ int last = in.readUnsignedVInt32();
+ int[] vs = new int[length];
+ deserializePackedInts(vs, 0, length - 1, last, in);
+ vs[length - 1] = last;
+ return vs;
+ }
+
+ public static void skipPackedSortedIntsAndLength(DataInputPlus in) throws
IOException
+ {
+ skipPackedSortedInts(in.readUnsignedVInt32(), in);
+ }
+
+ public static void skipPackedSortedInts(int length, DataInputPlus in)
throws IOException
+ {
+ if (length > 0)
+ {
+ int last = in.readUnsignedVInt32();
+ skipPackedInts(0, length - 1, last, in);
+ }
+ }
+
+ public static long serializedSizeOfPackedSortedIntsAndLength(int[] vs)
+ {
+ return TypeSizes.sizeofUnsignedVInt(vs.length) +
serializedSizeOfPackedSortedInts(vs);
+ }
+
+ public static long serializedSizeOfPackedSortedInts(int[] vs)
+ {
+ if (vs.length == 0)
+ return 0;
+ int last = vs[vs.length - 1];
+ return TypeSizes.sizeofUnsignedVInt(last) +
serializedPackedSize(vs.length - 1, last);
+ }
+
+ public static void serializePackedInts(int[] vs, int from, int to, long
max, DataOutputPlus out) throws IOException
{
serializePacked((in, i) -> in[i], vs, from, to, max, out);
}
- public static void deserializePackedInts(int[] vs, int from, int to, int
max, DataInputPlus in) throws IOException
+ public static void deserializePackedInts(int[] vs, int from, int to, long
max, DataInputPlus in) throws IOException
{
deserializePacked((out, i, v) -> out[i] = (int)v, vs, from, to, max,
in);
}
- public static long serializedPackedIntsSize(int[] vs, int from, int to,
int max)
+ public static void skipPackedInts(int from, int to, long max,
DataInputPlus in) throws IOException
+ {
+ in.skipBytesFully(serializedPackedSize(to - from, max));
+ }
+
+ public static long serializedPackedIntsSize(int[] vs, int from, int to,
long max)
{
return serializedPackedSize(to - from, max);
}
@@ -60,12 +132,15 @@ public class SerializePacked
if (bitsPerEntry == 0)
return;
+ long outOfRange = -1L << bitsPerEntry;
long buffer = 0L;
int bufferCount = 0;
for (int i = from; i < to; i++)
{
long v = adapter.get(in, i);
- Invariants.require(v <= max);
+ int finalI = i;
+ Invariants.require(v >= 0 && (v & outOfRange) == 0,
+ () -> String.format(v < 0 ? "Found a negative value at
offset %d; value %d" : "Value out of range at offset %d; value %d", (Object)
finalI, (Object) v));
buffer |= v << bufferCount;
bufferCount = bufferCount + bitsPerEntry;
if (bufferCount >= 64)
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/TableMetadatas.java
b/src/java/org/apache/cassandra/service/accord/serializers/TableMetadatas.java
index cec50a0a10..e022d193ff 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/TableMetadatas.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/TableMetadatas.java
@@ -22,6 +22,9 @@ import java.io.IOException;
import java.util.AbstractList;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
import accord.utils.Invariants;
import accord.utils.SortedArrays;
@@ -99,6 +102,14 @@ public abstract class TableMetadatas extends
AbstractList<TableId>
return new One(metadata);
}
+ @VisibleForTesting
+ public static Complete of(List<TableMetadata> values)
+ {
+ Collector collector = new Collector();
+ collector.addAll(values);
+ return collector.build();
+ }
+
public static Complete ofSortedUnique(TableMetadata ... metadatas)
{
if (metadatas.length == 0)
diff --git a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdate.java
b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdate.java
index 89536d2058..f6d50763a4 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdate.java
@@ -72,7 +72,7 @@ public abstract class AccordUpdate implements Update
}
- public boolean checkCondition(Data data)
+ public boolean checkAnyConditionMatch(Data data)
{
throw new UnsupportedOperationException();
}
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
index b0bcc5c58b..e380b87fe6 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
@@ -119,7 +119,7 @@ public abstract class TxnQuery implements Query
AccordUpdate accordUpdate = (AccordUpdate)update;
TxnData txnData = (TxnData)data;
- boolean conditionCheck = accordUpdate.checkCondition(data);
+ boolean conditionCheck = accordUpdate.checkAnyConditionMatch(data);
// If the condition applied an empty result indicates success
if (conditionCheck)
return new TxnData();
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
index f88b1eb3b8..2affc5d778 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
@@ -25,11 +25,13 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.function.Function;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import accord.api.Data;
+import accord.api.Key;
import accord.api.Update;
import accord.primitives.Keys;
import accord.primitives.Participants;
@@ -37,17 +39,22 @@ import accord.primitives.Ranges;
import accord.primitives.RoutableKey;
import accord.primitives.Timestamp;
import accord.utils.Invariants;
+import accord.utils.SimpleBitSet;
+import accord.utils.SimpleBitSets;
+import accord.utils.SortedArrays;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
+import org.apache.cassandra.io.UnversionedSerializer;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.service.PreserveTimestamp;
import org.apache.cassandra.service.accord.AccordObjectSizes;
import org.apache.cassandra.service.accord.IAccordService;
import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.service.accord.serializers.SerializePacked;
import org.apache.cassandra.service.accord.serializers.TableMetadatas;
import org.apache.cassandra.service.accord.serializers.TableMetadatasAndKeys;
import org.apache.cassandra.service.accord.serializers.Version;
@@ -55,17 +62,19 @@ import
org.apache.cassandra.service.accord.txn.TxnCondition.SerializedTxnConditi
import org.apache.cassandra.service.accord.txn.TxnWrite.Fragment;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ArraySerializers;
+import org.apache.cassandra.utils.CollectionSerializers;
import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.Pair;
+import static accord.utils.ArrayBuffers.cachedInts;
import static accord.utils.Invariants.requireArgument;
import static accord.utils.SortedArrays.Search.CEIL;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Boolean.FALSE;
import static
org.apache.cassandra.service.accord.AccordSerializers.consistencyLevelSerializer;
-import static org.apache.cassandra.utils.ArraySerializers.deserializeArray;
-import static org.apache.cassandra.utils.ArraySerializers.serializeArray;
-import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
+import static
org.apache.cassandra.service.accord.txn.TxnUpdate.BlockFragment.NO_BLOCK_FRAGMENTS;
+import static
org.apache.cassandra.service.accord.txn.TxnUpdate.ConditionalBlock.NO_CONDITIONAL_BLOCKS;
import static org.apache.cassandra.utils.ArraySerializers.skipArray;
import static org.apache.cassandra.utils.ByteBufferUtil.readWithVIntLength;
import static
org.apache.cassandra.utils.ByteBufferUtil.serializedSizeWithVIntLength;
@@ -77,13 +86,454 @@ import static
org.apache.cassandra.utils.NullableSerializer.serializedNullableSi
public class TxnUpdate extends AccordUpdate
{
- private static final long EMPTY_SIZE = ObjectSizes.measure(new
TxnUpdate(TableMetadatas.none(), null, new ByteBuffer[0], null, null,
PreserveTimestamp.no));
+ static class ConditionalBlock
+ {
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new
ConditionalBlock(0, null, null));
+ static final ConditionalBlock[] NO_CONDITIONAL_BLOCKS = new
ConditionalBlock[0];
+ public static final UnversionedSerializer<ConditionalBlock> serializer
= new UnversionedSerializer<>()
+ {
+ @Override
+ public void serialize(ConditionalBlock t, DataOutputPlus out)
throws IOException
+ {
+ out.writeUnsignedVInt32(t.id);
+ writeWithVIntLength(t.condition.bytes(), out);
+
SerializePacked.serializePackedSortedIntsAndLength(t.fragmentIds, out);
+ }
+
+ @Override
+ public ConditionalBlock deserialize(DataInputPlus in) throws
IOException
+ {
+ int id = in.readUnsignedVInt32();
+ SerializedTxnCondition condition = new
SerializedTxnCondition(readWithVIntLength(in));
+
+ // Deserialize mutations
+ int[] mutations =
SerializePacked.deserializePackedSortedIntsAndLength(in);
+ return new ConditionalBlock(id, condition, mutations);
+ }
+
+ @Override
+ public void skip(DataInputPlus in) throws IOException
+ {
+ in.readUnsignedVInt32();
+ skipWithVIntLength(in);
+ SerializePacked.skipPackedSortedIntsAndLength(in);
+ }
+
+ @Override
+ public long serializedSize(ConditionalBlock t)
+ {
+ long size = TypeSizes.sizeofUnsignedVInt(t.id);
+ size += serializedSizeWithVIntLength(t.condition.bytes());
+ size +=
SerializePacked.serializedSizeOfPackedSortedIntsAndLength(t.fragmentIds);
+ return size;
+ }
+ };
+
+ final int id;
+ final SerializedTxnCondition condition;
+ final int[] fragmentIds;
+
+ ConditionalBlock(int id, SerializedTxnCondition condition, int[]
fragmentIds)
+ {
+ this.id = id;
+ this.condition = condition;
+ this.fragmentIds = fragmentIds;
+ }
+
+ public long estimatedSizeOnHeap()
+ {
+ long size = EMPTY_SIZE;
+ size += condition.estimatedSizeOnHeap();
+ size += ObjectSizes.sizeOfArray(fragmentIds);
+ return size;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o == null || getClass() != o.getClass()) return false;
+ ConditionalBlock that = (ConditionalBlock) o;
+ return id == that.id && condition.equals(that.condition) &&
Arrays.equals(fragmentIds, that.fragmentIds);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(id, condition, Arrays.hashCode(fragmentIds));
+ }
+
+ public void toString(StringBuilder sb, TableMetadatas tables, Block
block)
+ {
+ sb.append("{condition=")
+ .append(condition.deserialize(tables))
+ .append(", fragments=")
+ .append(deserialize(tables, block, fragmentIds))
+ .append('}');
+ }
+
+ public ConditionalBlock merge(ConditionalBlock that)
+ {
+ requireArgument(this.id == that.id, "Tried to merge different
blocks; expected %d but given %d", this.id, that.id);
+ return new ConditionalBlock(id, condition,
SortedArrays.linearUnion(this.fragmentIds, 0, this.fragmentIds.length,
that.fragmentIds, 0, that.fragmentIds.length, cachedInts()));
+ }
+ }
+
+ static class BlockFragment
+ {
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new
BlockFragment(0, null, null));
+ static final BlockFragment[] NO_BLOCK_FRAGMENTS = new BlockFragment[0];
+ public static final ParameterisedUnversionedSerializer<BlockFragment,
TableMetadatasAndKeys> serializer = new ParameterisedUnversionedSerializer<>()
+ {
+ @Override
+ public void serialize(BlockFragment t, TableMetadatasAndKeys p,
DataOutputPlus out) throws IOException
+ {
+ out.writeUnsignedVInt32(t.id);
+ p.serializeKey(t.key, out);
+ writeWithVIntLength(t.bytes, out);
+ }
+
+ @Override
+ public BlockFragment deserialize(TableMetadatasAndKeys p,
DataInputPlus in) throws IOException
+ {
+ int id = in.readUnsignedVInt32();
+ PartitionKey key = p.deserializeKey(in);
+ ByteBuffer bytes = readWithVIntLength(in);
+ return new BlockFragment(id, key, bytes);
+ }
+
+ @Override
+ public void skip(TableMetadatasAndKeys p, DataInputPlus in) throws
IOException
+ {
+ in.readUnsignedVInt32();
+ p.skipKeys(in);
+ skipWithVIntLength(in);
+ }
+
+ @Override
+ public long serializedSize(BlockFragment t, TableMetadatasAndKeys
p)
+ {
+ long size = TypeSizes.sizeofUnsignedVInt(t.id);
+ size += p.serializedKeySize(t.key);
+ size += serializedSizeWithVIntLength(t.bytes);
+ return size;
+ }
+ };
+
+ final int id;
+ final PartitionKey key;
+ final ByteBuffer bytes;
+
+ BlockFragment(int id, PartitionKey key, ByteBuffer bytes)
+ {
+ this.id = id;
+ this.key = key;
+ this.bytes = bytes;
+ }
+
+ public boolean equals(Object that)
+ {
+ return that instanceof BlockFragment && equals((BlockFragment)
that);
+ }
+
+ public boolean equals(BlockFragment that)
+ {
+ return this.id == that.id && this.key.equals(that.key) &&
this.bytes.equals(that.bytes);
+ }
+
+ public long estimatedSizeOnHeap()
+ {
+ long size = EMPTY_SIZE;
+ size += ObjectSizes.sizeOnHeapOf(bytes);
+ // don't count key as reference to key in parent
+ return size;
+ }
+ }
+
+ static class Block
+ {
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new
Block(null, null));
+ public static final ParameterisedUnversionedSerializer<Block,
TableMetadatasAndKeys> serializer = new ParameterisedUnversionedSerializer<>()
+ {
+ @Override
+ public void serialize(Block t, TableMetadatasAndKeys p,
DataOutputPlus out) throws IOException
+ {
+ ArraySerializers.serializeArray(t.fragments, p, out,
BlockFragment.serializer);
+ ArraySerializers.serializeArray(t.conditionalBlocks, out,
ConditionalBlock.serializer);
+ }
+
+ @Override
+ public Block deserialize(TableMetadatasAndKeys p, DataInputPlus
in) throws IOException
+ {
+ BlockFragment[] fragments =
ArraySerializers.deserializeArray(p, in, BlockFragment.serializer,
BlockFragment[]::new);
+ ConditionalBlock[] conditionalBlocks =
ArraySerializers.deserializeArray(in, ConditionalBlock.serializer,
ConditionalBlock[]::new);
+ return new Block(fragments, conditionalBlocks);
+ }
+
+ @Override
+ public void skip(TableMetadatasAndKeys p, DataInputPlus in) throws
IOException
+ {
+ ArraySerializers.skipArray(p, in, BlockFragment.serializer);
+ ArraySerializers.skipArray(in, ConditionalBlock.serializer);
+ }
+
+ @Override
+ public long serializedSize(Block t, TableMetadatasAndKeys p)
+ {
+ long size = 0;
+ size += ArraySerializers.serializedArraySize(t.fragments, p,
BlockFragment.serializer);
+ size +=
ArraySerializers.serializedArraySize(t.conditionalBlocks,
ConditionalBlock.serializer);
+ return size;
+ }
+ };
+
+ final BlockFragment[] fragments;
+ final ConditionalBlock[] conditionalBlocks;
+
+ Block(BlockFragment[] fragments, ConditionalBlock[] conditionalBlocks)
+ {
+ this.fragments = fragments;
+ this.conditionalBlocks = conditionalBlocks;
+ }
+
+ public long estimatedSizeOnHeap()
+ {
+ long size = EMPTY_SIZE;
+ size += ObjectSizes.sizeOfArray(fragments);
+ for (BlockFragment bf : fragments)
+ size += bf.estimatedSizeOnHeap();
+ for (ConditionalBlock conditionalBlock : conditionalBlocks)
+ size += conditionalBlock.estimatedSizeOnHeap();
+ return size;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o == null || getClass() != o.getClass()) return false;
+ Block block = (Block) o;
+ return Arrays.equals(fragments, block.fragments) &&
Arrays.equals(conditionalBlocks, block.conditionalBlocks);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(Arrays.hashCode(fragments),
Arrays.hashCode(conditionalBlocks));
+ }
+
+ public void toString(StringBuilder sb, TableMetadatas tables)
+ {
+ sb.append("{conditionalBlocks=[");
+ for (int j = 0; j < conditionalBlocks.length; j++)
+ {
+ if (j > 0) sb.append(", ");
+ conditionalBlocks[j].toString(sb, tables, this);
+ }
+ sb.append("]}");
+ }
+
+ public Block select(Keys keys)
+ {
+ int[] outFragmentIds = cachedInts().getInts(fragments.length);
+ BlockFragment[] outFragments;
+ int count = 0;
+ {
+ {
+ int i = 0, j = 0;
+ while (i < keys.size() && j < fragments.length)
+ {
+ Key key = keys.get(i++);
+ j = SortedArrays.exponentialSearch(fragments, j,
fragments.length, key, (k, b) -> k.compareTo(b.key), CEIL);
+ if (j < 0) j = -1 - j;
+ else
+ {
+ do outFragmentIds[count++] = j;
+ while (++j < fragments.length &&
fragments[j].key.equals(key));
+ }
+ }
+ }
+
+ if (count == fragments.length)
+ return this;
+
+ if (count == 0)
+ return new Block(NO_BLOCK_FRAGMENTS,
NO_CONDITIONAL_BLOCKS);
+
+ outFragments = new BlockFragment[count];
+ for (int i = 0 ; i < count ; ++i)
+ {
+ outFragments[i] = fragments[outFragmentIds[i]];
+ outFragmentIds[i] = outFragments[i].id;
+ }
+ }
+
+ ConditionalBlock[] outConditions;
+ {
+ List<ConditionalBlock> collect = null;
+ for (int i = 0 ; i < conditionalBlocks.length ; ++i)
+ {
+ ConditionalBlock cb = conditionalBlocks[i];
+ int[] cbOutFragmentIds =
SortedArrays.linearIntersection(cb.fragmentIds, 0, cb.fragmentIds.length,
outFragmentIds, 0, count, cachedInts());
+ //noinspection ArrayEquality
+ if (cbOutFragmentIds != cb.fragmentIds) // when arrays are
equal the cb.fragmentIds gets returned unchanged, so can do a pointer check to
detect a change
+ {
+ if (collect == null)
+ {
+ collect = new ArrayList<>(conditionalBlocks.length
- 1);
+ for (int j = 0 ; j < i ; ++j) //TODO (review): why
do we include the previous blocks that "should" have empty fragments, but we
provide them without empty fragments?
+ collect.add(conditionalBlocks[j]);
+ }
+ if (cbOutFragmentIds.length > 0)
+ collect.add(new ConditionalBlock(cb.id,
cb.condition, cbOutFragmentIds));
+ }
+ }
+ if (collect == null) outConditions = conditionalBlocks;
+ else if (collect.isEmpty()) outConditions =
NO_CONDITIONAL_BLOCKS;
+ else outConditions = collect.toArray(ConditionalBlock[]::new);
+ }
+
+ cachedInts().forceDiscard(outFragmentIds);
+ return new Block(outFragments, outConditions);
+ }
+
+ public Block merge(Block that)
+ {
+ BlockFragment[] outFragments;
+ if (this.fragments.length == 0) outFragments = that.fragments;
+ else if (that.fragments.length == 0) outFragments = this.fragments;
+ else
+ {
+ int minId = Math.min(this.fragments[0].id,
that.fragments[0].id);
+ int maxId = Math.max(this.fragments[this.fragments.length -
1].id, that.fragments[that.fragments.length - 1].id);
+ outFragments = new
BlockFragment[Math.min(this.fragments.length + that.fragments.length, 1 +
(maxId - minId))];
+
+ int i = 0, j = 0, count = 0;
+ while (i < this.fragments.length || j < that.fragments.length)
+ {
+ int cmp;
+ if (i == this.fragments.length) cmp = 1;
+ else if (j == that.fragments.length) cmp = -1;
+ else cmp = this.fragments[i].id - that.fragments[j].id;
+
+ if (cmp <= 0)
+ {
+ outFragments[count] = this.fragments[i];
+ ++i;
+ j += cmp == 0 ? 1 : 0;
+ }
+ else
+ {
+ outFragments[count] = that.fragments[j];
+ ++j;
+ }
+ ++count;
+ }
+
+ if (count != outFragments.length)
+ outFragments = Arrays.copyOf(outFragments, count);
+ }
+
+ ConditionalBlock[] outConditions;
+ if (this.conditionalBlocks.length == 0) outConditions =
that.conditionalBlocks;
+ else if (that.conditionalBlocks.length == 0) outConditions =
this.conditionalBlocks;
+ else
+ {
+ int minId = Math.min(this.conditionalBlocks[0].id,
that.conditionalBlocks[0].id);
+ int maxId =
Math.max(this.conditionalBlocks[this.conditionalBlocks.length - 1].id,
that.conditionalBlocks[that.conditionalBlocks.length - 1].id);
+ outConditions = new
ConditionalBlock[Math.min(this.conditionalBlocks.length +
that.conditionalBlocks.length, 1 + maxId - minId)];
+ int i = 0, j = 0, count = 0;
+ while (i < this.conditionalBlocks.length || j <
that.conditionalBlocks.length)
+ {
+ int cmp;
+ if (i == this.conditionalBlocks.length) cmp = 1;
+ else if (j == that.conditionalBlocks.length) cmp = -1;
+ else cmp = this.conditionalBlocks[i].id -
that.conditionalBlocks[j].id;
+
+ if (cmp == 0)
+ outConditions[count] =
this.conditionalBlocks[i++].merge(that.conditionalBlocks[j++]);
+ else if (cmp < 0)
+ outConditions[count] = this.conditionalBlocks[i++];
+ else
+ outConditions[count] = that.conditionalBlocks[j++];
+ ++count;
+ }
+ if (count < outConditions.length)
+ outConditions = Arrays.copyOf(outConditions, count);
+ }
+ return new Block(outFragments, outConditions);
+ }
+ }
+
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new
TxnUpdate(TableMetadatas.none(), Keys.EMPTY, Collections.emptyList(), null,
PreserveTimestamp.no));
private static final int FLAG_PRESERVE_TIMESTAMPS = 0x1;
final TableMetadatas tables;
- private final Keys keys;
- private final ByteBuffer[] fragments;
- private final SerializedTxnCondition condition;
+ final Keys keys;
+ /**
+ * CASSANDRA-20883 added this logic in, but didn't update the CQL layer to
leverage it; left for follow-up work.
+ * <p>
+ * The reason for this setup is to allow the following in CQL (any any
combination of them):
+ * <p>
+ * <code>
+ * IF cond1 THEN
+ * mutation1
+ * ELSE
+ * mutation2
+ * END IF
+ * </code>
+ * <p>
+ * <code>
+ * IF cond1 THEN
+ * mutation1
+ * IF cond2 THEN
+ * mutation2
+ * ELSE
+ * mutation3
+ * END IF
+ * ELSE IF cond3 THEN
+ * mutation4
+ * END IF
+ * </code>
+ * <p>
+ * and lastly
+ * <p>
+ * <code>
+ * IF cond THEN
+ * mutation1
+ * END IF
+ * mutation2
+ * </code>
+ * <p>
+ * Each {@link Block} represents a single <code>IF / END IF</code> block.
+ * Each {@link ConditionalBlock} represents a single condition with its
mutations
+ * <p>
+ * Given the flat structure, you must rewrite the <code>IF / END IF</code>
into this structure, so for cases like nested IF they should uplift the
conditions as so
+ * <p>
+ * Before
+ * <code>
+ * IF cond1 THEN
+ * mutation1
+ * IF cond2 THEN
+ * mutation2
+ * ELSE
+ * mutation3
+ * END IF
+ * END IF
+ * </code>
+ * <p>
+ * After
+ * <code>
+ * IF cond1 AND cond2 THEN
+ * mutation1
+ * mutation2
+ * ELSE IF cond1
+ * mutation1
+ * END IF
+ * </code>
+ * <p>
+ * When a non-conditional set of mutations exists with conditional ones,
then the non-conditional mutations should
+ * be in their own block with a empty condition.
+ */
+ final List<Block> blocks;
@Nullable
private final ConsistencyLevel cassandraCommitCL;
@@ -94,46 +544,51 @@ public class TxnUpdate extends AccordUpdate
private final PreserveTimestamp preserveTimestamps;
// Memoize computation of condition
- private Boolean conditionResult;
+ private Boolean anyConditionResult;
public TxnUpdate(TableMetadatas tables, List<Fragment> fragments,
TxnCondition condition, @Nullable ConsistencyLevel cassandraCommitCL,
PreserveTimestamp preserveTimestamps)
{
requireArgument(cassandraCommitCL == null ||
IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS.contains(cassandraCommitCL));
+ fragments.sort(Fragment::compareKeys);
this.tables = tables;
this.keys = Keys.of(fragments, fragment -> fragment.key);
- fragments.sort(Fragment::compareKeys);
+
+ BlockFragment[] blockFragments = new BlockFragment[fragments.size()];
// TODO (required): this node could be on version N while the peers
are on N-1, which would have issues as the peers wouldn't know about N yet.
// Can not eagerly serialize until we know the "correct" version,
else we need a way to fallback on mismatch.
- this.fragments = toSerializedValuesArray(keys, fragments, tables,
Version.LATEST);
- // TODO (desired): slice TxnCondition, or pick a single shard to
persist it
- this.condition = new SerializedTxnCondition(condition, tables);
- this.condition.unmemoize();
- this.condition.deserialize(tables);
+ int[] fragmentIds = new int[fragments.size()];
+ for (int i = 0 ; i < fragments.size() ; ++i)
+ {
+ blockFragments[i] = new BlockFragment(i, fragments.get(i).key,
Fragment.FragmentSerializer.serialize(fragments.get(i), tables,
Version.LATEST));
+ fragmentIds[i] = i;
+ }
+
+ SerializedTxnCondition serializedCondition = new
SerializedTxnCondition(condition, tables);
+ this.blocks = Collections.singletonList(new Block(blockFragments, new
ConditionalBlock[] { new ConditionalBlock(0, serializedCondition, fragmentIds)
}));
this.cassandraCommitCL = cassandraCommitCL;
this.preserveTimestamps = preserveTimestamps;
}
- private TxnUpdate(TableMetadatas tables, Keys keys, ByteBuffer[]
fragments, SerializedTxnCondition condition, ConsistencyLevel
cassandraCommitCL, PreserveTimestamp preserveTimestamps)
+ private TxnUpdate(TableMetadatas tables, Keys keys, List<Block> blocks,
ConsistencyLevel cassandraCommitCL, PreserveTimestamp preserveTimestamps)
{
this.tables = tables;
this.keys = keys;
- this.fragments = fragments;
- this.condition = condition;
+ this.blocks = blocks;
this.cassandraCommitCL = cassandraCommitCL;
this.preserveTimestamps = preserveTimestamps;
}
public static TxnUpdate empty()
{
- return new TxnUpdate(TableMetadatas.none(), Collections.emptyList(),
TxnCondition.none(), null, PreserveTimestamp.no);
+ return new TxnUpdate(TableMetadatas.none(), Keys.EMPTY,
Collections.emptyList(), null, PreserveTimestamp.no);
}
@Override
public long estimatedSizeOnHeap()
{
- long size = EMPTY_SIZE + condition.estimatedSizeOnHeap();
- for (ByteBuffer update : fragments)
- size += ByteBufferUtil.estimatedSizeOnHeap(update);
+ long size = EMPTY_SIZE;
+ for (Block block : blocks)
+ size += block.estimatedSizeOnHeap();
size += AccordObjectSizes.keys(keys);
return size;
}
@@ -141,8 +596,14 @@ public class TxnUpdate extends AccordUpdate
@Override
public String toString()
{
- return "TxnUpdate{updates=" + deserialize(keys, tables, fragments) +
- ", condition=" + condition.deserialize(tables) + '}';
+ StringBuilder sb = new StringBuilder("TxnUpdate{blocks=[");
+ for (int i = 0; i < blocks.size(); i++)
+ {
+ if (i > 0) sb.append(", ");
+ blocks.get(i).toString(sb, tables);
+ }
+ sb.append("]}");
+ return sb.toString();
}
@Override
@@ -151,21 +612,18 @@ public class TxnUpdate extends AccordUpdate
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TxnUpdate txnUpdate = (TxnUpdate) o;
- return Arrays.equals(fragments, txnUpdate.fragments) &&
Objects.equals(condition, txnUpdate.condition);
+ return Objects.equals(blocks, txnUpdate.blocks);
}
@Override
public int hashCode()
{
- int result = Objects.hash(condition);
- result = 31 * result + Arrays.hashCode(fragments);
- return result;
+ return Objects.hash(blocks);
}
@Override
public Keys keys()
{
- // TODO: It doesn't seem to affect correctness, but should we return
the union of the fragment + condition keys?
return keys;
}
@@ -177,58 +635,39 @@ public class TxnUpdate extends AccordUpdate
}
@Override
- public Update slice(Ranges ranges)
+ public TxnUpdate slice(Ranges ranges)
{
- Keys keys = this.keys.slice(ranges);
- // TODO (desired): Slice the condition.
- return new TxnUpdate(tables, keys, select(this.keys, keys, fragments),
condition, cassandraCommitCL, preserveTimestamps);
+ return getTxnUpdate(keys -> keys.slice(ranges));
}
@Override
- public Update intersecting(Participants<?> participants)
+ public TxnUpdate intersecting(Participants<?> participants)
{
- Keys keys = this.keys.intersecting(participants);
- // TODO (desired): Slice the condition.
- return new TxnUpdate(tables, keys, select(this.keys, keys, fragments),
condition, cassandraCommitCL, preserveTimestamps);
+ return getTxnUpdate(keys -> keys.intersecting(participants));
}
- private static ByteBuffer[] select(Keys in, Keys out, ByteBuffer[] from)
+ @VisibleForTesting
+ TxnUpdate getTxnUpdate(Function<Keys, Keys> fn)
{
- ByteBuffer[] result = new ByteBuffer[out.size()];
- int j = 0;
- for (int i = 0 ; i < out.size() ; ++i)
- {
- j = in.findNext(j, out.get(i), CEIL);
- result[i] = from[j];
- }
- return result;
+ Keys newKeys = fn.apply(keys);
+ List<Block> blocks = new ArrayList<>(this.blocks.size());
+ for (Block block : this.blocks)
+ blocks.add(block.select(newKeys));
+ return new TxnUpdate(tables, newKeys, blocks, cassandraCommitCL,
preserveTimestamps);
}
@Override
- public Update merge(Update update)
+ public TxnUpdate merge(Update update)
{
TxnUpdate that = (TxnUpdate) update;
- Keys mergedKeys = this.keys.with(that.keys);
- // TODO (desired): special method for linear merging keyed and
non-keyed lists simultaneously
- ByteBuffer[] mergedFragments = merge(this.keys, that.keys,
this.fragments, that.fragments, mergedKeys.size());
- return new TxnUpdate(tables, mergedKeys, mergedFragments, condition,
cassandraCommitCL, preserveTimestamps);
- }
-
- private static ByteBuffer[] merge(Keys leftKeys, Keys rightKeys,
ByteBuffer[] left, ByteBuffer[] right, int outputSize)
- {
- ByteBuffer[] out = new ByteBuffer[outputSize];
- int l = 0, r = 0, o = 0;
- while (l < leftKeys.size() && r < rightKeys.size())
- {
- int c = leftKeys.get(l).compareTo(rightKeys.get(r));
- if (c < 0) { out[o++] = left[l++]; }
- else if (c > 0) { out[o++] = right[r++]; }
- else if (ByteBufferUtil.compareUnsigned(left[l], right[r]) != 0) {
throw new IllegalStateException("The same keys have different values in each
input"); }
- else { out[o++] = left[l++]; r++; }
- }
- while (l < leftKeys.size()) { out[o++] = left[l++]; }
- while (r < rightKeys.size()) { out[o++] = right[r++]; }
- return out;
+ requireArgument(that.blocks.size() == this.blocks.size(), "Blocks dont
have the same sizes; expected %d but was %d", this.blocks.size(),
that.blocks.size());
+ Keys keys = this.keys.with(that.keys);
+
+ List<Block> mergedBlocks = new ArrayList<>(this.blocks.size());
+ for (int i = 0; i < this.blocks.size(); i++)
+ mergedBlocks.add(this.blocks.get(i).merge(that.blocks.get(i)));
+
+ return new TxnUpdate(tables, keys, mergedBlocks, cassandraCommitCL,
preserveTimestamps);
}
@Override
@@ -236,33 +675,43 @@ public class TxnUpdate extends AccordUpdate
{
ClusterMetadata cm = ClusterMetadata.current();
checkState(cm.epoch.getEpoch() >= executeAt.epoch(), "TCM epoch %d is
< executeAt epoch %d", cm.epoch.getEpoch(), executeAt.epoch());
- if (!checkCondition(data))
- return TxnWrite.EMPTY_CONDITION_FAILED;
-
- if (keys.isEmpty())
- return new TxnWrite(TableMetadatas.none(),
Collections.emptyList(), true);
- List<Fragment> fragments = deserialize(keys, tables, this.fragments);
- List<TxnWrite.Update> updates = new ArrayList<>(fragments.size());
- QueryOptions options =
QueryOptions.forProtocolVersion(ProtocolVersion.CURRENT);
- AccordUpdateParameters parameters = new
AccordUpdateParameters((TxnData) data, options, executeAt.uniqueHlc());
+ Pair<List<TxnWrite.Update>, SimpleBitSet> pair =
processCondition(executeAt, data);
+ if (pair == null)
+ return new TxnWrite(TableMetadatas.none(),
Collections.emptyList(), SimpleBitSets.allUnset(numConditionalBlocks()));
- for (Fragment fragment : fragments)
- // Filter out fragments that already constitute complete updates
to avoid persisting them via TxnWrite:
- if (!fragment.isComplete())
- updates.add(fragment.complete(parameters, tables));
+ List<TxnWrite.Update> allUpdates = pair.left;
+ SimpleBitSet conditionalBlockBitSet = pair.right;
+ if (keys.isEmpty())
+ return new TxnWrite(TableMetadatas.none(),
Collections.emptyList(), SimpleBitSets.allSet(numConditionalBlocks()));
- return new TxnWrite(tables, updates, true);
+ return new TxnWrite(tables, allUpdates, conditionalBlockBitSet);
}
- public List<TxnWrite.Update> completeUpdatesForKey(RoutableKey key)
+
+ private boolean checkCondition(Data data, SerializedTxnCondition condition)
{
- List<Fragment> fragments = deserialize(keys, tables, this.fragments);
- List<TxnWrite.Update> updates = new ArrayList<>(fragments.size());
+ TxnCondition deserializedCondition = condition.deserialize(tables);
+ if (deserializedCondition == TxnCondition.none())
+ return true;
+ return deserializedCondition.applies((TxnData) data);
+ }
- for (Fragment fragment : fragments)
- if (fragment.isComplete() && fragment.key.equals(key))
- updates.add(fragment.toUpdate(tables));
+ public List<TxnWrite.Update> completeUpdatesForKey(SimpleBitSet
conditionalBlockBitSet, RoutableKey key)
+ {
+ List<TxnWrite.Update> updates = new ArrayList<>();
+
+ for (Block block : blocks)
+ {
+ for (ConditionalBlock conditionalBlock : block.conditionalBlocks)
+ {
+ if (!conditionalBlockBitSet.get(conditionalBlock.id)) continue;
+ List<Fragment> fragments = deserialize(tables, block,
conditionalBlock.fragmentIds);
+ for (Fragment fragment : fragments)
+ if (fragment.isComplete() && fragment.key.equals(key))
+ updates.add(fragment.toUpdate(tables));
+ }
+ }
return updates;
}
@@ -273,13 +722,12 @@ public class TxnUpdate extends AccordUpdate
public void serialize(TxnUpdate update, TableMetadatasAndKeys
tablesAndKeys, DataOutputPlus out, Version version) throws IOException
{
// Serializing it with the condition result set shouldn't be needed
- checkState(update.conditionResult == null, "Can't serialize if
conditionResult is set without adding it to serialization");
+ checkState(update.anyConditionResult == null, "Can't serialize if
conditionResult is set without adding it to serialization");
// Once in accord "mixedTimeSource" and "yes" are the same, so
only care about the side effect: that the timestamp is preserved or not
out.writeByte(update.preserveTimestamps.preserve ?
FLAG_PRESERVE_TIMESTAMPS : 0);
tablesAndKeys.serializeKeys(update.keys, out);
- writeWithVIntLength(update.condition.bytes(), out);
- serializeArray(update.fragments, out,
ByteBufferUtil.byteBufferSerializer);
serializeNullable(update.cassandraCommitCL, out,
consistencyLevelSerializer);
+ CollectionSerializers.serializeList(update.blocks, tablesAndKeys,
out, Block.serializer);
}
@Override
@@ -288,20 +736,18 @@ public class TxnUpdate extends AccordUpdate
int flags = in.readByte();
boolean preserveTimestamps = (FLAG_PRESERVE_TIMESTAMPS & flags) ==
1;
Keys keys = tablesAndKeys.deserializeKeys(in);
- ByteBuffer condition = readWithVIntLength(in);
- ByteBuffer[] fragments = deserializeArray(in,
ByteBufferUtil.byteBufferSerializer, ByteBuffer[]::new);
ConsistencyLevel consistencyLevel = deserializeNullable(in,
consistencyLevelSerializer);
- return new TxnUpdate(tablesAndKeys.tables, keys, fragments, new
SerializedTxnCondition(condition), consistencyLevel, preserveTimestamps ?
PreserveTimestamp.yes : PreserveTimestamp.no);
+ List<Block> blocks =
CollectionSerializers.deserializeList(tablesAndKeys, in, Block.serializer);
+
+ return new TxnUpdate(tablesAndKeys.tables, keys, blocks,
consistencyLevel, preserveTimestamps ? PreserveTimestamp.yes :
PreserveTimestamp.no);
}
@Override
public void skip(TableMetadatasAndKeys tablesAndKeys, DataInputPlus
in, Version version) throws IOException
{
- in.readByte();
- tablesAndKeys.skipKeys(in);
- skipWithVIntLength(in);
- skipArray(in, ByteBufferUtil.byteBufferSerializer);
- deserializeNullable(in, consistencyLevelSerializer);
+ in.readByte(); // flags
+ deserializeNullable(in, consistencyLevelSerializer); //
consistency level
+ skipArray(tablesAndKeys, in, Block.serializer);
}
@Override
@@ -309,105 +755,102 @@ public class TxnUpdate extends AccordUpdate
{
long size = 1; // flags
size += tablesAndKeys.serializedKeysSize(update.keys);
- size += serializedSizeWithVIntLength(update.condition.bytes());
- size += serializedArraySize(update.fragments,
ByteBufferUtil.byteBufferSerializer);
size += serializedNullableSize(update.cassandraCommitCL,
consistencyLevelSerializer);
+ size += CollectionSerializers.serializedListSize(update.blocks,
tablesAndKeys, Block.serializer);
return size;
}
};
- private static ByteBuffer[] toSerializedValuesArray(Keys keys,
List<Fragment> items, TableMetadatas tables, Version version)
+ private static List<Fragment> deserialize(TableMetadatas tables, Block
block, int[] includeFragmentIds)
{
- ByteBuffer[] result = new ByteBuffer[keys.size()];
- int i = 0, mi = items.size(), ki = 0;
- while (i < mi)
+ List<Fragment> result = new ArrayList<>(includeFragmentIds.length);
+ int i = 0;
+ for (int fragmentId : includeFragmentIds)
{
- PartitionKey key = items.get(i).key;
- int j = i + 1;
- while (j < mi && items.get(j).key.equals(key))
- ++j;
-
- int nextki = keys.findNext(ki, key, CEIL);
- Arrays.fill(result, ki, nextki, ByteBufferUtil.EMPTY_BYTE_BUFFER);
- ki = nextki;
- result[ki++] = toSerializedValues(items, tables, i, j, version);
- i = j;
+ while (block.fragments[i].id < fragmentId)
+ ++i;
+
+ Invariants.require(block.fragments[i].id == fragmentId);
+ BlockFragment fragment = block.fragments[i];
+ try (DataInputBuffer in = new DataInputBuffer(fragment.bytes,
true))
+ {
+ Version version = Version.fromVersion(in.readUnsignedVInt32());
+ result.add(Fragment.serializer.deserialize(fragment.key,
tables, in, version));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- Arrays.fill(result, ki, result.length,
ByteBufferUtil.EMPTY_BYTE_BUFFER);
return result;
}
- private static ByteBuffer toSerializedValues(List<Fragment> items,
TableMetadatas tables, int start, int end, Version version)
+ @Override
+ public void failCondition()
{
- long size = TypeSizes.sizeofUnsignedVInt(version.version) +
TypeSizes.sizeofUnsignedVInt(end - start);
- for (int i = start ; i < end ; ++i)
- size += Fragment.serializer.serializedSize(items.get(i), tables,
version);
+ anyConditionResult = FALSE;
+ }
- try (DataOutputBuffer out = new DataOutputBuffer((int) size))
- {
- out.writeUnsignedVInt32(version.version);
- out.writeUnsignedVInt32(end - start);
- for (int i = start ; i < end ; ++i)
- Fragment.serializer.serialize(items.get(i), tables, out,
version);
- return out.buffer(false);
- }
- catch (IOException e)
+ @Override
+ public boolean checkAnyConditionMatch(Data data)
+ {
+ // Assert data that was memoized is same as data that is provided?
+ if (anyConditionResult != null)
+ return anyConditionResult;
+
+ // Check if any block has a matching condition
+ for (Block block : blocks)
{
- throw new RuntimeException(e);
+ for (ConditionalBlock conditionalBlock : block.conditionalBlocks)
+ {
+ if (checkCondition(data, conditionalBlock.condition))
+ return anyConditionResult = true;
+ }
}
+ return anyConditionResult = false;
}
- private static List<Fragment> deserialize(PartitionKey key, TableMetadatas
tables, ByteBuffer bytes)
+ @Nullable
+ private Pair<List<TxnWrite.Update>, SimpleBitSet>
processCondition(Timestamp executeAt, Data data)
{
- if (!bytes.hasRemaining())
- return Collections.emptyList();
-
- try (DataInputBuffer in = new DataInputBuffer(bytes, true))
+ int numConditionalBlocks = numConditionalBlocks();
+ SimpleBitSet conditionalBlocksMatched =
SimpleBitSet.allocate(numConditionalBlocks);
+ List<Fragment> fragments = null;
+ // Each block is executed indepdendently so a match in one block has
no effect on another block,
+ // this is done this way to support conditional with unconditional
writes, and multiple IF/END IF blocks
+ for (Block block : blocks)
{
- Version version = Version.fromVersion(in.readUnsignedVInt32());
- int count = in.readUnsignedVInt32();
- switch (count)
+ // This loop needs to support the expected semantics of IF/ELSE
IF/ELSE blocks;
+ // first condition that is true is the only one that applies.
+ for (ConditionalBlock conditionalBlock : block.conditionalBlocks)
{
- case 0: throw new IllegalStateException();
- case 1: return
Collections.singletonList(Fragment.serializer.deserialize(key, tables, in,
version));
- default:
- List<Fragment> result = new ArrayList<>();
- for (int i = 0 ; i < count ; ++i)
- result.add(Fragment.serializer.deserialize(key,
tables, in, version));
- return result;
+ if (checkCondition(data, conditionalBlock.condition))
+ {
+ conditionalBlocksMatched.set(conditionalBlock.id);
+ if (fragments == null) fragments = new ArrayList<>();
+ fragments.addAll(deserialize(tables, block,
conditionalBlock.fragmentIds));
+ break;
+ }
}
}
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
+ if (fragments == null) return null;
- private static List<Fragment> deserialize(Keys keys, TableMetadatas
tables, ByteBuffer[] buffers)
- {
- Invariants.require(keys.size() == buffers.length);
- List<Fragment> result = new ArrayList<>(buffers.length);
- for (int i = 0 ; i < keys.size() ; ++i)
- result.addAll(deserialize((PartitionKey) keys.get(i), tables,
buffers[i]));
- return result;
- }
+ List<TxnWrite.Update> allUpdates = new ArrayList<>(fragments.size());
+ QueryOptions options =
QueryOptions.forProtocolVersion(ProtocolVersion.CURRENT);
+ AccordUpdateParameters parameters = new
AccordUpdateParameters((TxnData) data, options, executeAt.uniqueHlc());
- @Override
- public void failCondition()
- {
- conditionResult = FALSE;
+ for (Fragment fragment : fragments)
+ if (!fragment.isComplete())
+ allUpdates.add(fragment.complete(parameters, tables));
+ return Pair.create(allUpdates, conditionalBlocksMatched);
}
- @Override
- public boolean checkCondition(Data data)
+ private int numConditionalBlocks()
{
- // Assert data that was memoized is same as data that is provided?
- if (conditionResult != null)
- return conditionResult;
- TxnCondition condition = this.condition.deserialize(tables);
- if (condition == TxnCondition.none())
- return conditionResult = true;
- return conditionResult = condition.applies((TxnData) data);
+ int numConditionalBlocks = 0;
+ for (Block block : blocks)
+ numConditionalBlocks += block.conditionalBlocks.length;
+ return numConditionalBlocks;
}
@Override
@@ -425,6 +868,6 @@ public class TxnUpdate extends AccordUpdate
@VisibleForTesting
public void unsafeResetCondition()
{
- conditionResult = null;
+ anyConditionResult = null;
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index e8e70d7be3..26af2aaaaf 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -42,6 +42,8 @@ import accord.primitives.Seekable;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
+import accord.utils.SimpleBitSet;
+import accord.utils.SimpleBitSets;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import org.apache.cassandra.cql3.UpdateParameters;
@@ -69,6 +71,7 @@ import
org.apache.cassandra.service.accord.serializers.Version;
import org.apache.cassandra.utils.BooleanSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.SimpleBitSetSerializers;
import static com.google.common.base.Preconditions.checkState;
import static
org.apache.cassandra.db.rows.DeserializationHelper.Flag.FROM_REMOTE;
@@ -84,9 +87,7 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
@SuppressWarnings("unused")
private static final Logger logger =
LoggerFactory.getLogger(TxnWrite.class);
- public static final TxnWrite EMPTY_CONDITION_FAILED = new
TxnWrite(TableMetadatas.none(), Collections.emptyList(), false);
-
- private static final long EMPTY_SIZE =
ObjectSizes.measure(EMPTY_CONDITION_FAILED);
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new
TxnWrite(TableMetadatas.none(), Collections.emptyList(),
SimpleBitSets.allUnset(1)));
public static class Update extends
AbstractParameterisedVersionedSerialized<PartitionUpdate, TableMetadatas>
{
@@ -398,24 +399,54 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
size += TypeSizes.sizeofUnsignedVInt(fragment.timestamp);
return size;
}
+
+ public static ByteBuffer serialize(Fragment fragment,
TableMetadatas tables, Version version)
+ {
+ long size = TypeSizes.sizeofUnsignedVInt(version.version);
+ size += Fragment.serializer.serializedSize(fragment, tables,
version);
+
+ try (DataOutputBuffer out = new DataOutputBuffer((int) size))
+ {
+ out.writeUnsignedVInt32(version.version);
+ Fragment.serializer.serialize(fragment, tables, out,
version);
+ return out.buffer(false);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static Fragment deserialize(PartitionKey key,
TableMetadatas tables, ByteBuffer bytes)
+ {
+ try (DataInputBuffer in = new DataInputBuffer(bytes, true))
+ {
+ Version version =
Version.fromVersion(in.readUnsignedVInt32());
+ return Fragment.serializer.deserialize(key, tables, in,
version);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
}
public final TableMetadatas tables;
- private final boolean isConditionMet;
+ private final SimpleBitSet conditionalBlockBitSet;
- private TxnWrite(TableMetadatas tables, Update[] items, boolean
isConditionMet)
+ private TxnWrite(TableMetadatas tables, Update[] items, SimpleBitSet
conditionalBlockBitSet)
{
super(items, Domain.Key);
this.tables = tables;
- this.isConditionMet = isConditionMet;
+ this.conditionalBlockBitSet = conditionalBlockBitSet;
}
- public TxnWrite(TableMetadatas tables, List<Update> items, boolean
isConditionMet)
+ public TxnWrite(TableMetadatas tables, List<Update> items, SimpleBitSet
conditionalBlockBitSet)
{
super(items, Domain.Key);
this.tables = tables;
- this.isConditionMet = isConditionMet;
+ this.conditionalBlockBitSet = conditionalBlockBitSet;
}
@Override
@@ -461,7 +492,7 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
// TODO (expected): optimise for the common single update case; lots
of lists allocated
List<AsyncChain<Void>> results = new ArrayList<>();
- if (isConditionMet)
+ if (!conditionalBlockBitSet.isEmpty())
{
AccordExecutor executor = ((AccordCommandStore)
commandStore).executor();
boolean preserveTimestamps =
txnUpdate.preserveTimestamps().preserve;
@@ -471,7 +502,7 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
// Apply updates that are fully specified by the client and not
reliant on data from reads.
// ex. INSERT INTO tbl (a, b, c) VALUES (1, 2, 3)
// These updates are persisted only in TxnUpdate and not in
TxnWrite to avoid duplication.
- List<Update> updates =
txnUpdate.completeUpdatesForKey((RoutableKey) key);
+ List<Update> updates =
txnUpdate.completeUpdatesForKey(conditionalBlockBitSet, (RoutableKey) key);
updates.forEach(write -> results.add(write.write(executor, tables,
preserveTimestamps, timestamp)));
}
@@ -498,7 +529,7 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
public void serialize(TxnWrite write, Seekables keys, DataOutputPlus
out, Version version) throws IOException
{
write.tables.serializeSelf(out);
- BooleanSerializer.serializer.serialize(write.isConditionMet, out);
+
SimpleBitSetSerializers.any.serialize(write.conditionalBlockBitSet, out);
serializeArray(write.items, new
TableMetadatasAndKeys(write.tables, keys), out, version, Update.serializer);
}
@@ -506,8 +537,8 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
public TxnWrite deserialize(Seekables keys, DataInputPlus in, Version
version) throws IOException
{
TableMetadatas tables = TableMetadatas.deserializeSelf(in);
- boolean isConditionMet =
BooleanSerializer.serializer.deserialize(in);
- return new TxnWrite(tables, deserializeArray(new
TableMetadatasAndKeys(tables, keys), in, version, Update.serializer,
Update[]::new), isConditionMet);
+ SimpleBitSet conditionalBlockBitSet =
SimpleBitSetSerializers.any.deserialize(in);
+ return new TxnWrite(tables, deserializeArray(new
TableMetadatasAndKeys(tables, keys), in, version, Update.serializer,
Update[]::new), conditionalBlockBitSet);
}
@Override
@@ -522,7 +553,7 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
public long serializedSize(TxnWrite write, Seekables keys, Version
version)
{
return write.tables.serializedSelfSize()
- +
BooleanSerializer.serializer.serializedSize(write.isConditionMet)
+ +
SimpleBitSetSerializers.any.serializedSize(write.conditionalBlockBitSet)
+ serializedArraySize(write.items, new
TableMetadatasAndKeys(write.tables, keys), version, Update.serializer);
}
};
diff --git a/src/java/org/apache/cassandra/utils/ArraySerializers.java
b/src/java/org/apache/cassandra/utils/ArraySerializers.java
index 7ca6afa8a7..db2cd28248 100644
--- a/src/java/org/apache/cassandra/utils/ArraySerializers.java
+++ b/src/java/org/apache/cassandra/utils/ArraySerializers.java
@@ -23,6 +23,7 @@ import java.util.function.IntFunction;
import org.apache.cassandra.io.AsymmetricVersionedSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
import org.apache.cassandra.io.ParameterisedVersionedSerializer;
import org.apache.cassandra.io.UnversionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -60,6 +61,13 @@ public class ArraySerializers
serializer.serialize(item, p, out, version);
}
+ public static <T, P> void serializeArray(T[] items, P p, DataOutputPlus
out, ParameterisedUnversionedSerializer<T, P> serializer) throws IOException
+ {
+ out.writeUnsignedVInt32(items.length);
+ for (T item : items)
+ serializer.serialize(item, p, out);
+ }
+
public static <T> T[] deserializeArray(DataInputPlus in,
UnversionedSerializer<T> serializer, IntFunction<T[]> arrayFactory) throws
IOException
{
int size = in.readUnsignedVInt32();
@@ -118,6 +126,22 @@ public class ArraySerializers
serializer.skip(p, in, version);
}
+ public static <T, P> T[] deserializeArray(P p, DataInputPlus in,
ParameterisedUnversionedSerializer<T, P> serializer, IntFunction<T[]>
arrayFactory) throws IOException
+ {
+ int size = in.readUnsignedVInt32();
+ T[] items = arrayFactory.apply(size);
+ for (int i = 0; i < size; i++)
+ items[i] = serializer.deserialize(p, in);
+ return items;
+ }
+
+ public static <T, P> void skipArray(P p, DataInputPlus in,
ParameterisedUnversionedSerializer<T, P> serializer) throws IOException
+ {
+ int size = in.readUnsignedVInt32();
+ for (int i = 0; i < size; i++)
+ serializer.skip(p, in);
+ }
+
public static <T> long serializedArraySize(T[] array,
UnversionedSerializer<T> serializer)
{
long size = sizeofUnsignedVInt(array.length);
@@ -149,4 +173,12 @@ public class ArraySerializers
size += serializer.serializedSize(item, p, version);
return size;
}
+
+ public static <T, P> long serializedArraySize(T[] array, P p,
ParameterisedUnversionedSerializer<T, P> serializer)
+ {
+ long size = sizeofUnsignedVInt(array.length);
+ for (T item : array)
+ size += serializer.serializedSize(item, p);
+ return size;
+ }
}
diff --git a/src/java/org/apache/cassandra/utils/CollectionSerializers.java
b/src/java/org/apache/cassandra/utils/CollectionSerializers.java
index d4718096a3..b2efd717ab 100644
--- a/src/java/org/apache/cassandra/utils/CollectionSerializers.java
+++ b/src/java/org/apache/cassandra/utils/CollectionSerializers.java
@@ -580,7 +580,7 @@ public class CollectionSerializers
return result;
}
- private static <V, P, C extends Collection<? super V>, Version> C
deserializeCollection(P p, DataInputPlus in,
AsymmetricParameterisedUnversionedSerializer<?, P, V> serializer,
IntFunction<C> factory) throws IOException
+ private static <V, P, C extends Collection<? super V>> C
deserializeCollection(P p, DataInputPlus in,
AsymmetricParameterisedUnversionedSerializer<?, P, V> serializer,
IntFunction<C> factory) throws IOException
{
int size = in.readUnsignedVInt32();
C result = factory.apply(size);
diff --git a/test/unit/org/apache/cassandra/io/Serializers.java
b/test/unit/org/apache/cassandra/io/Serializers.java
index e9e7f04c7b..34da9e4b35 100644
--- a/test/unit/org/apache/cassandra/io/Serializers.java
+++ b/test/unit/org/apache/cassandra/io/Serializers.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.function.BiConsumer;
import accord.utils.LazyToString;
import accord.utils.ReflectionUtils;
@@ -33,6 +34,13 @@ public class Serializers
// @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" })
DataOutputBuffer output = new DataOutputBuffer();
public static <T> void testSerde(DataOutputBuffer output,
AsymmetricUnversionedSerializer<T, T> serializer, T input) throws IOException
+ {
+ testSerde(output, serializer, input, (actual, expected) ->
Assertions.assertThat(actual)
+
.describedAs("The deserialized output does not match the serialized input;
difference %s", new LazyToString(() -> ReflectionUtils.recursiveEquals(actual,
input).toString()))
+
.isEqualTo(expected));
+ }
+
+ public static <T> void testSerde(DataOutputBuffer output,
AsymmetricUnversionedSerializer<T, T> serializer, T input, BiConsumer<T, T>
testEqual) throws IOException
{
output.clear();
long expectedSize = serializer.serializedSize(input);
@@ -41,7 +49,7 @@ public class Serializers
ByteBuffer buffer = output.unsafeGetBufferAndFlip();
DataInputBuffer in = new DataInputBuffer(buffer, false);
T read = serializer.deserialize(in);
- Assertions.assertThat(read).describedAs("The deserialized output does
not match the serialized input; difference %s", new LazyToString(() ->
ReflectionUtils.recursiveEquals(read, input).toString())).isEqualTo(input);
+ testEqual.accept(read, input);
Assertions.assertThat(buffer.remaining()).describedAs("deserialize did
not consume all the serialized input").isEqualTo(0);
buffer.flip();
buffer.mark();
diff --git
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 786b38f7ed..7a348d6c45 100644
---
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -99,6 +99,7 @@ import accord.utils.AccordGens;
import accord.utils.Gen;
import accord.utils.Gens;
import accord.utils.RandomSource;
+import accord.utils.SimpleBitSets;
import accord.utils.SortedArrays;
import accord.utils.UnhandledEnum;
import accord.utils.async.AsyncChain;
@@ -204,7 +205,7 @@ public class CommandsForKeySerializerTest
if (saveStatus.known.outcome() == Known.Outcome.Apply)
{
if (txnId.is(Kind.Write))
- builder.writes(new Writes(txnId, executeAt, txn.keys(),
new TxnWrite(TableMetadatas.none(), Collections.emptyList(), true)));
+ builder.writes(new Writes(txnId, executeAt, txn.keys(),
new TxnWrite(TableMetadatas.none(), Collections.emptyList(),
SimpleBitSets.allSet(1))));
builder.result(new TxnData());
}
return builder;
diff --git
a/test/unit/org/apache/cassandra/service/accord/serializers/SerializePackedTest.java
b/test/unit/org/apache/cassandra/service/accord/serializers/SerializePackedTest.java
new file mode 100644
index 0000000000..2c355139e6
--- /dev/null
+++
b/test/unit/org/apache/cassandra/service/accord/serializers/SerializePackedTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.serializers;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import accord.utils.Gens;
+import org.junit.Test;
+
+import accord.utils.Gen;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.Serializers;
+import org.apache.cassandra.io.UnversionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.assertj.core.api.Assertions;
+
+import static accord.utils.Property.qt;
+
+public class SerializePackedTest
+{
+ private static final Gen<int[]> zeros = rs -> new int[rs.nextInt(0, 10)];
+ private static final Gen<int[]> randomZeroAndPositive = array(rs ->
rs.nextInt(-1, Integer.MAX_VALUE) + 1);
+ private static final Gen<int[]> randomZeroAndPositiveSmall = array(rs ->
rs.nextInt(0, 1 << 8));
+ private static final Gen<int[]> negatives =
array(Gens.ints().between(Integer.MIN_VALUE, -1), rs -> rs.nextInt(1, 10));
+
+ private static final Gen<int[]> monotonic = rs -> {
+ int[] array = new int[rs.nextInt(0, 10)];
+ for (int i = 0; i < array.length; i++)
+ array[i] = i;
+ return array;
+ };
+
+ private static final Gen<int[]> zeroAndPositive = rs -> {
+ if (rs.decide(0.2f)) return monotonic.next(rs);
+ return randomZeroAndPositive.next(rs);
+ };
+
+ private static final Gen<int[]> zeroAndPositiveSmall = rs -> {
+ if (rs.decide(0.2f)) return monotonic.next(rs);
+ return randomZeroAndPositiveSmall.next(rs);
+ };
+
+ private static Gen<int[]> array(Gen.IntGen valueGen)
+ {
+ return array(valueGen, rs -> rs.nextInt(0, 10));
+ }
+
+ private static Gen<int[]> array(Gen.IntGen valueGen, Gen.IntGen size)
+ {
+ return rs -> {
+ int[] array = new int[size.nextInt(rs)];
+ for (int i = 0; i < array.length; i++)
+ array[i] = valueGen.nextInt(rs);
+ Arrays.sort(array);
+ return array;
+ };
+ }
+
+ @Test
+ public void serde()
+ {
+ @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed"
}) DataOutputBuffer output = new DataOutputBuffer();
+ qt().forAll(zeroAndPositive).check(array ->
Serializers.testSerde(output, PackedSortedSerializer.instance, array));
+ }
+
+ @Test
+ public void serdeNegative()
+ {
+ @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed"
}) DataOutputBuffer output = new DataOutputBuffer();
+ qt().forAll(negatives).check(array -> {
+ output.clear();
+ Assertions.assertThatThrownBy(() ->
PackedSortedSerializer.instance.serialize(array, output))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Found a negative value at offset");
+ });
+ }
+
+ @Test
+ public void serdeZeros()
+ {
+ @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed"
}) DataOutputBuffer output = new DataOutputBuffer();
+ qt().forAll(zeros).check(array -> Serializers.testSerde(output,
PackedSortedSerializer.instance, array));
+ }
+
+ @Test
+ public void serializerIsSmallerThanSimpleList()
+ {
+ qt().forAll(zeroAndPositiveSmall).check(array -> {
+ var list = SimpleListSerializer.instance.serialize(array);
+ var packed = PackedSortedSerializer.instance.serialize(array);
+
+
Assertions.assertThat(packed.remaining()).isLessThanOrEqualTo(list.remaining());
+ });
+ }
+
+ public static class PackedSortedSerializer implements
UnversionedSerializer<int[]>
+ {
+ public static final PackedSortedSerializer instance = new
PackedSortedSerializer();
+
+ @Override
+ public void serialize(int[] t, DataOutputPlus out) throws IOException
+ {
+ SerializePacked.serializePackedSortedIntsAndLength(t, out);
+ }
+
+ @Override
+ public int[] deserialize(DataInputPlus in) throws IOException
+ {
+ return SerializePacked.deserializePackedSortedIntsAndLength(in);
+ }
+
+ @Override
+ public long serializedSize(int[] t)
+ {
+ return
SerializePacked.serializedSizeOfPackedSortedIntsAndLength(t);
+ }
+ }
+
+ public static class SimpleListSerializer implements
UnversionedSerializer<int[]>
+ {
+ public static final SimpleListSerializer instance = new
SimpleListSerializer();
+
+ @Override
+ public void serialize(int[] t, DataOutputPlus out) throws IOException
+ {
+ out.writeUnsignedVInt32(t.length);
+ for (int i : t)
+ out.writeVInt32(i);
+ }
+
+ @Override
+ public int[] deserialize(DataInputPlus in) throws IOException
+ {
+ int size = in.readUnsignedVInt32();
+ int[] array = new int[size];
+ for (int i = 0; i < size; i++)
+ array[i] = in.readVInt32();
+ return array;
+ }
+
+ @Override
+ public long serializedSize(int[] t)
+ {
+ long size = TypeSizes.sizeofUnsignedVInt(t.length);
+ for (int i = 0; i < t.length; i++)
+ size += TypeSizes.sizeofVInt(t[i]);
+ return size;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/test/unit/org/apache/cassandra/service/accord/txn/TxnUpdateTest.java
b/test/unit/org/apache/cassandra/service/accord/txn/TxnUpdateTest.java
new file mode 100644
index 0000000000..9edf8ea089
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/accord/txn/TxnUpdateTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import org.junit.Test;
+
+import accord.api.Key;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.SortedArrays;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.io.Serializers;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.PreserveTimestamp;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.service.accord.serializers.TableMetadatas;
+import org.apache.cassandra.service.accord.serializers.TableMetadatasAndKeys;
+import
org.apache.cassandra.service.accord.txn.TxnCondition.SerializedTxnCondition;
+import org.apache.cassandra.service.accord.txn.TxnUpdate.Block;
+import org.apache.cassandra.service.accord.txn.TxnUpdate.ConditionalBlock;
+import org.apache.cassandra.service.accord.txn.TxnWrite.Fragment;
+import org.apache.cassandra.utils.AccordGenerators;
+import org.apache.cassandra.utils.CassandraGenerators;
+import org.apache.cassandra.utils.Generators;
+
+import static accord.utils.Property.qt;
+import static accord.utils.SortedArrays.Search.FAST;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TxnUpdateTest
+{
+ private static final LongToken T0 = new LongToken(0);
+ private static final LongToken T42 = new LongToken(42);
+
+ static
+ {
+ DatabaseDescriptor.clientInitialization();
+ DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ }
+
+ private static final Gen<ByteBuffer> bytesGen =
Generators.toGen(Generators.bytes(0, 20));
+ private static final Gen<List<TableId>> uniqueIds =
Gens.lists(Generators.toGen(CassandraGenerators.TABLE_ID_GEN)).unique().ofSizeBetween(1,
3);
+ private static final Gen<List<TableMetadata>> tablesGen =
uniqueIds.map(ids -> {
+ List<TableMetadata> tables = new ArrayList<>();
+ for (int i = 0; i < ids.size(); i++)
+ {
+ tables.add(TableMetadata.builder("ks", "tbl" + i, ids.get(i))
+ .addPartitionKeyColumn("key",
BytesType.instance)
+ .partitioner(Murmur3Partitioner.instance)
+ .build());
+ }
+ return tables;
+ });
+
+ @Test
+ public void conditionalBlockSerde()
+ {
+ @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed"
}) DataOutputBuffer output = new DataOutputBuffer();
+ qt().forAll(conditionalBlock()).check(expected ->
Serializers.testSerde(output, ConditionalBlock.serializer, expected));
+ }
+
+ @Test
+ public void blockSerde()
+ {
+ @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed"
}) DataOutputBuffer output = new DataOutputBuffer();
+ qt().forAll(block()).check(expected -> {
+ TableMetadatasAndKeys.KeyCollector collector = new
TableMetadatasAndKeys.KeyCollector(TableMetadatas.none());
+ for (TxnUpdate.BlockFragment fragment : expected.fragments)
+ collector.add(fragment.key);
+ Serializers.testSerde(output, Block.serializer, expected,
collector.buildTablesAndKeys());
+ });
+ }
+
+ @Test
+ public void slice()
+ {
+ qt().check(rs -> {
+ List<TableMetadata> tables = tablesGen.next(rs);
+ TableMetadatas metadatas = TableMetadatas.of(tables);
+ List<Fragment> fragments =
Gens.lists(fragment(tables)).ofSizeBetween(1, 10).next(rs);
+ TxnUpdate update = new TxnUpdate(metadatas, fragments,
TxnCondition.none(), null, PreserveTimestamp.no);
+
+ // ask for ranges outside the update; should be empty
+ for (var block :
update.slice(Ranges.single(TokenRange.create(TableId.UNDEFINED, T0,
T42))).blocks)
+ {
+ assertThat(block.fragments).isEmpty();
+ for (var cb : block.conditionalBlocks)
+ assertThat(cb.fragmentIds).isEmpty();
+ }
+
+ // slice the same key should return the same block
+ TxnUpdate noUpdate = update.getTxnUpdate(k ->
k.intersecting(update.keys()));
+ for (int i = 0; i < update.blocks.size(); i++)
+
assertThat(noUpdate.blocks.get(i)).isSameAs(update.blocks.get(i));
+
+ // slicing a single key yields a single key
+ if (update.keys().size() == 1) return;
+ int keyIndex = rs.nextInt(0, update.keys().size());
+ Key key = update.keys().get(keyIndex);
+ Keys singleKey = Keys.of(key);
+ TxnUpdate singleKeyUpdate = update.getTxnUpdate(k ->
k.intersecting(singleKey));
+ for (int i = 0; i < update.blocks.size(); i++)
+ {
+ var block = singleKeyUpdate.blocks.get(i);
+
assertThat(block.fragments).hasSize((int)fragments.stream().filter(f ->
f.key.equals(key)).count());
+ for (ConditionalBlock conditionalBlock :
block.conditionalBlocks)
+ {
+ for (int fragmentId : conditionalBlock.fragmentIds)
+ {
+ int fragmentIndex =
SortedArrays.binarySearch(block.fragments, 0, block.fragments.length,
fragmentId, (id, bf) -> Integer.compare(id, bf.id), FAST);
+ assertThat(fragmentIndex >= 0).isTrue();
+
assertThat(block.fragments[fragmentIndex].key).isEqualTo(key);
+ }
+ }
+ }
+ });
+ }
+
+ @Test
+ public void merge()
+ {
+ qt().check(rs -> {
+ List<TableMetadata> tables = tablesGen.next(rs);
+ TableMetadatas metadatas = TableMetadatas.of(tables);
+ List<Fragment> fragments =
Gens.lists(fragment(tables)).ofSizeBetween(1, 10).next(rs);
+ TxnUpdate update = new TxnUpdate(metadatas, fragments,
TxnCondition.none(), null, PreserveTimestamp.no);
+ TxnUpdate emptyUpdate =
update.slice(Ranges.single(TokenRange.create(TableId.UNDEFINED, T0, T42)));
+ List<TxnUpdate> perKeyUpdate = new
ArrayList<>(update.keys().size());
+ for (int i = 0; i < update.keys().size(); i++)
+ {
+ int finalI = i;
+ perKeyUpdate.add(update.getTxnUpdate(k ->
k.intersecting(Keys.of(update.keys().get(finalI)))));
+ }
+
+ assertThat(update.merge(update)).isEqualTo(update); // merge with
self produces self
+ assertThat(emptyUpdate.merge(emptyUpdate)).isEqualTo(emptyUpdate);
// merge
+
+ // empty with full is commutative
+ assertThat(update.merge(emptyUpdate)).isEqualTo(update);
+ assertThat(emptyUpdate.merge(update)).isEqualTo(update);
+
+ // merge per key is commutative
+ TxnUpdate accum = emptyUpdate;
+ for (TxnUpdate other : perKeyUpdate)
+ accum = accum.merge(other);
+ assertThat(accum).isEqualTo(update);
+
+ accum = emptyUpdate;
+ Collections.reverse(perKeyUpdate);
+ for (TxnUpdate other : perKeyUpdate)
+ accum = accum.merge(other);
+ assertThat(accum).isEqualTo(update);
+ });
+ }
+
+ private static Gen<Fragment> fragment(List<TableMetadata> tables)
+ {
+ return rs -> {
+ var metadata = rs.pick(tables);
+ var pk = bytesGen.next(rs);
+ DecoratedKey key = metadata.partitioner.decorateKey(pk);
+
+ PartitionUpdate update = PartitionUpdate.emptyUpdate(metadata,
key);
+
+ return new Fragment(new PartitionKey(metadata.id, key),
rs.nextInt(0, Integer.MAX_VALUE), update, TxnReferenceOperations.empty(),
rs.nextLong(1, Long.MAX_VALUE));
+ };
+ }
+
+ private static Gen<ConditionalBlock> conditionalBlock()
+ {
+ Gen<SerializedTxnCondition> serializedTxnConditionGen =
serializedTxnCondition();
+ Gen<int[]> fragmentsGen = Gens.arrays(Gens.ints().between(0,
Integer.MAX_VALUE)).ofSizeBetween(0, 10).map(vs -> { Arrays.sort(vs); return
vs; });
+ return rs -> {
+ int id = rs.nextInt(-1, Integer.MAX_VALUE) + 1;
+ SerializedTxnCondition condition =
serializedTxnConditionGen.next(rs);
+ int[] fragments = fragmentsGen.next(rs);
+ return new ConditionalBlock(id, condition, fragments);
+ };
+ }
+
+ private static Gen<SerializedTxnCondition> serializedTxnCondition()
+ {
+ Gen<ByteBuffer> bytesGen =
TxnUpdateTest.bytesGen.filter(ByteBuffer::hasRemaining);
+ return rs -> new SerializedTxnCondition(bytesGen.next(rs));
+ }
+
+ private static Gen<Block> block()
+ {
+ // can't have a empty block
+ Gen<ByteBuffer[]> bytesGen = Gens.arrays(ByteBuffer.class,
TxnUpdateTest.bytesGen.filter(ByteBuffer::hasRemaining))
+ .ofSizeBetween(0,
10);
+ var conditionGen = Gens.arrays(ConditionalBlock.class,
conditionalBlock()).ofSizeBetween(1, 10);
+ Gen<Key> keyGen = (Gen<Key>) (Gen<?>)
AccordGenerators.keys(Murmur3Partitioner.instance);
+ return rs -> {
+ ByteBuffer[] bbs = bytesGen.next(rs);
+ Key[] keys = IntStream.range(0, bbs.length).mapToObj(i ->
keyGen.next(rs)).toArray(Key[]::new);
+ int[] ids = IntStream.range(0, bbs.length).toArray();
+ Arrays.sort(ids);
+ Arrays.sort(keys);
+ TxnUpdate.BlockFragment[] fragments = new
TxnUpdate.BlockFragment[bbs.length];
+ for (int i = 0 ; i < fragments.length ; ++i)
+ fragments[i] = new TxnUpdate.BlockFragment(ids[i],
(PartitionKey) keys[i], bbs[i]);
+ return new Block(fragments, conditionGen.next(rs));
+ };
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
index 3b4fad9a0f..61696303d1 100644
--- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
@@ -69,6 +69,7 @@ import accord.utils.Gen;
import accord.utils.Gens;
import accord.utils.RandomSource;
import accord.utils.ReducingRangeMap;
+import accord.utils.SimpleBitSets;
import accord.utils.SortedArrays.SortedArrayList;
import accord.utils.TinyEnumSet;
import accord.utils.TriFunction;
@@ -290,7 +291,7 @@ public class AccordGenerators
if (saveStatus.hasBeen(Status.PreApplied) &&
!saveStatus.hasBeen(Status.Truncated))
{
if (txnId.is(Write))
- builder.writes(new Writes(txnId, executeAt, keysOrRanges,
new TxnWrite(TableMetadatas.none(), Collections.emptyList(), true)));
+ builder.writes(new Writes(txnId, executeAt, keysOrRanges,
new TxnWrite(TableMetadatas.none(), Collections.emptyList(),
SimpleBitSets.allSet(1))));
builder.result(new TxnData());
}
return builder;
@@ -345,8 +346,8 @@ public class AccordGenerators
else return Truncated.truncated(command, saveStatus,
executeAt, null, null, null, null);
case TruncatedApplyWithOutcome:
- if (txnId.kind().awaitsOnlyDeps()) return
Truncated.truncated(command, saveStatus, executeAt, command.partialDeps(),
txnId.is(Write) ? new Writes(txnId, executeAt, keysOrRanges, new
TxnWrite(TableMetadatas.none(), Collections.emptyList(), true)) : null, new
TxnData(), txnId);
- else return Truncated.truncated(command, saveStatus,
executeAt, command.partialDeps(), txnId.is(Write) ? new Writes(txnId,
executeAt, keysOrRanges, new TxnWrite(TableMetadatas.none(),
Collections.emptyList(), true)) : null, new TxnData(), null);
+ if (txnId.kind().awaitsOnlyDeps()) return
Truncated.truncated(command, saveStatus, executeAt, command.partialDeps(),
txnId.is(Write) ? new Writes(txnId, executeAt, keysOrRanges, new
TxnWrite(TableMetadatas.none(), Collections.emptyList(),
SimpleBitSets.allSet(1))) : null, new TxnData(), txnId);
+ else return Truncated.truncated(command, saveStatus,
executeAt, command.partialDeps(), txnId.is(Write) ? new Writes(txnId,
executeAt, keysOrRanges, new TxnWrite(TableMetadatas.none(),
Collections.emptyList(), SimpleBitSets.allSet(1))) : null, new TxnData(), null);
case Erased:
case Vestigial:
diff --git
a/test/unit/org/apache/cassandra/utils/LargeBitSetSerializerTest.java
b/test/unit/org/apache/cassandra/utils/LargeBitSetSerializerTest.java
deleted file mode 100644
index c32b69a5c2..0000000000
--- a/test/unit/org/apache/cassandra/utils/LargeBitSetSerializerTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.utils;
-
-import accord.utils.Gen;
-import accord.utils.LargeBitSet;
-import org.apache.cassandra.io.Serializers;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.junit.Test;
-
-import static accord.utils.Property.qt;
-
-public class LargeBitSetSerializerTest
-{
- @Test
- public void test()
- {
- @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed"
}) DataOutputBuffer output = new DataOutputBuffer();
- qt().forAll(largeBitSetGen()).check(bits ->
Serializers.testSerde(output, SimpleBitSetSerializers.large, bits));
- }
-
- private static Gen<LargeBitSet> largeBitSetGen()
- {
- return rs -> {
- int size = rs.nextInt(0, 1 << 10);
- LargeBitSet bitSet = new LargeBitSet(size);
- if (size == 0 || rs.decide(0.2))
- return bitSet; // empty
- if (rs.decide(0.2))
- {
- // set 1 bit randomly
- bitSet.set(rs.nextInt(0, size));
- return bitSet;
- }
- for (int i = 0; i < size; i++)
- {
- if (rs.nextBoolean())
- bitSet.set(i);
- }
- return bitSet;
- };
- }
-}
\ No newline at end of file
diff --git
a/test/unit/org/apache/cassandra/utils/SimpleBitSetSerializersTest.java
b/test/unit/org/apache/cassandra/utils/SimpleBitSetSerializersTest.java
new file mode 100644
index 0000000000..0ca2ff5e81
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/SimpleBitSetSerializersTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import org.junit.Test;
+
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.LargeBitSet;
+import accord.utils.SimpleBitSet;
+import accord.utils.SmallBitSet;
+import org.apache.cassandra.io.Serializers;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.assertj.core.api.Assertions;
+
+import static accord.utils.Property.qt;
+
+public class SimpleBitSetSerializersTest
+{
+ @Test
+ public void small()
+ {
+ @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed"
}) DataOutputBuffer output = new DataOutputBuffer();
+ qt().forAll(smallGen()).check(bits -> Serializers.testSerde(output,
SimpleBitSetSerializers.small, bits));
+ }
+
+ @Test
+ public void large()
+ {
+ @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed"
}) DataOutputBuffer output = new DataOutputBuffer();
+ qt().forAll(largeGen()).check(bits -> Serializers.testSerde(output,
SimpleBitSetSerializers.large, bits));
+ }
+
+ @Test
+ public void any()
+ {
+ @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed"
}) DataOutputBuffer output = new DataOutputBuffer();
+ qt().forAll(anyGen()).check(bits -> {
+ Serializers.testSerde(output, SimpleBitSetSerializers.any, bits,
(actual, expected) -> {
+ if (actual.getClass() == expected.getClass())
+ {
+ Assertions.assertThat(actual)
+ .describedAs("The deserialized output does not
match the serialized input")
+ .isEqualTo(expected);
+ }
+ else
+ {
+ // large can become small when deserialize
+
Assertions.assertThat(expected.getClass()).isEqualTo(LargeBitSet.class);
+
Assertions.assertThat(actual.getClass()).isEqualTo(SmallBitSet.class);
+
+
Assertions.assertThat(actual.nextSetBit(0)).isEqualTo(expected.nextSetBit(0));
+
+ for (int i = actual.nextSetBit(0); i >= 0;)
+ {
+ Assertions.assertThat(actual.nextSetBit(i + 1))
+ .describedAs("Difference searching for next
bit from %s", (i + 1))
+ .isEqualTo(expected.nextSetBit(i + 1));
+ i = actual.nextSetBit(i + 1);
+ }
+ }
+ });
+ });
+ }
+
+ private static Gen<SimpleBitSet> anyGen()
+ {
+ return rs -> rs.nextBoolean() ? smallGen().next(rs) :
largeGen().next(rs);
+ }
+
+ private static Gen<SmallBitSet> smallGen()
+ {
+ return Gens.longs().all().map(SmallBitSet::new);
+ }
+
+ private static Gen<LargeBitSet> largeGen()
+ {
+ return rs -> {
+ int size = rs.nextInt(0, 1 << 10);
+ LargeBitSet bitSet = new LargeBitSet(size);
+ if (size == 0 || rs.decide(0.2))
+ return bitSet; // empty
+ if (rs.decide(0.2))
+ {
+ // set 1 bit randomly
+ bitSet.set(rs.nextInt(0, size));
+ return bitSet;
+ }
+ for (int i = 0; i < size; i++)
+ {
+ if (rs.nextBoolean())
+ bitSet.set(i);
+ }
+ return bitSet;
+ };
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]