This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit e0f62370d3dd23bbf47fac464a701aaa94c924f6 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Mon Jan 27 16:27:06 2025 +0000 Follow-up to CASSANDRA-20228: - Fix AccordUpdateParameters to correctly supply List cell paths derived from applyAt - Fix ExecuteAtSerialize.serialiseNullable - Fix topologies flush regression caused by markRetired forcing a flush on every call patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-20228 --- .../apache/cassandra/cql3/UpdateParameters.java | 17 +------- .../cql3/statements/ModificationStatement.java | 23 +++++------ .../org/apache/cassandra/cql3/terms/Lists.java | 45 +--------------------- .../cassandra/service/accord/AccordKeyspace.java | 8 +--- .../accord/serializers/CommandSerializers.java | 6 +-- .../service/accord/txn/AccordUpdateParameters.java | 34 ++++++++++++---- .../compaction/CompactionAccordIteratorsTest.java | 1 - .../serializers/CommandsForKeySerializerTest.java | 1 - 8 files changed, 44 insertions(+), 91 deletions(-) diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java index 66ae2bc2691..e4ceec16fb7 100644 --- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java +++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java @@ -51,10 +51,9 @@ public class UpdateParameters public final TableMetadata metadata; public final ClientState clientState; public final QueryOptions options; - public final boolean constructingAccordBaseUpdate; private final long nowInSec; - private final long timestamp; + protected final long timestamp; private final int ttl; private final DeletionTime deletionTime; @@ -75,18 +74,6 @@ public class UpdateParameters long nowInSec, int ttl, Map<DecoratedKey, Partition> prefetchedRows) throws InvalidRequestException - { - this(metadata, clientState, options, timestamp, nowInSec, ttl, prefetchedRows, false); - } - - public UpdateParameters(TableMetadata metadata, - ClientState clientState, - QueryOptions options, - long timestamp, - long nowInSec, - int ttl, - Map<DecoratedKey, Partition> prefetchedRows, - boolean constructingAccordBaseUpdate) throws InvalidRequestException { this.metadata = metadata; this.clientState = clientState; @@ -104,8 +91,6 @@ public class UpdateParameters // it to avoid potential confusion. if (timestamp == Long.MIN_VALUE) throw new InvalidRequestException(String.format("Out of bound timestamp, must be in [%d, %d]", Long.MIN_VALUE + 1, Long.MAX_VALUE)); - - this.constructingAccordBaseUpdate = constructingAccordBaseUpdate; } public <V> void newRow(Clustering<V> clustering) throws InvalidRequestException diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 0231ab16fd1..11340815178 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -924,8 +924,8 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa local, timestamp, nowInSeconds, - requestTime, - constructingAccordBaseUpdate); + requestTime + ); for (ByteBuffer key : keys) { Validation.validateKey(metadata(), key); @@ -945,7 +945,7 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa if (restrictions.hasClusteringColumnsRestrictions() && clusterings.isEmpty()) return; - UpdateParameters params = makeUpdateParameters(keys, clusterings, state, options, local, timestamp, nowInSeconds, requestTime, constructingAccordBaseUpdate); + UpdateParameters params = makeUpdateParameters(keys, clusterings, state, options, local, timestamp, nowInSeconds, requestTime); for (ByteBuffer key : keys) { @@ -982,8 +982,7 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa boolean local, long timestamp, long nowInSeconds, - Dispatcher.RequestTime requestTime, - boolean constructingAccordBaseUpdate) + Dispatcher.RequestTime requestTime) { if (clusterings.contains(Clustering.STATIC_CLUSTERING)) return makeUpdateParameters(keys, @@ -994,8 +993,8 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa local, timestamp, nowInSeconds, - requestTime, - constructingAccordBaseUpdate); + requestTime + ); return makeUpdateParameters(keys, new ClusteringIndexNamesFilter(clusterings, false), @@ -1005,8 +1004,8 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa local, timestamp, nowInSeconds, - requestTime, - constructingAccordBaseUpdate); + requestTime + ); } private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys, @@ -1017,8 +1016,7 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa boolean local, long timestamp, long nowInSeconds, - Dispatcher.RequestTime requestTime, - boolean constructingAccordBaseUpdate) + Dispatcher.RequestTime requestTime) { // Some lists operation requires reading Map<DecoratedKey, Partition> lists = @@ -1036,8 +1034,7 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa getTimestamp(timestamp, options), nowInSeconds, getTimeToLive(options), - lists, - constructingAccordBaseUpdate); + lists); } public static abstract class Parsed extends QualifiedStatement diff --git a/src/java/org/apache/cassandra/cql3/terms/Lists.java b/src/java/org/apache/cassandra/cql3/terms/Lists.java index a0889e75ded..9edd1e3f0bf 100644 --- a/src/java/org/apache/cassandra/cql3/terms/Lists.java +++ b/src/java/org/apache/cassandra/cql3/terms/Lists.java @@ -44,7 +44,6 @@ import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.db.marshal.MultiElementType; -import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.CellPath; import org.apache.cassandra.db.rows.ComplexColumnData; @@ -52,7 +51,6 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.TimeUUID; import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse; import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; @@ -68,13 +66,6 @@ public abstract class Lists @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(Lists.class); - /** - * Sentinel value indicating the cell path should be replaced by Accord with one based on the transaction executeAt - */ - private static final TimeUUID ACCORD_CELL_PATH_SENTINEL_UUID = TimeUUID.atUnixMicrosWithLsb(0, 0); - public static final CellPath ACCORD_DUMMY_CELL_PATH = CellPath.create(ACCORD_CELL_PATH_SENTINEL_UUID.toBytes()); - private static final long ACCORD_CELL_PATH_SENTINEL_MSB = ACCORD_CELL_PATH_SENTINEL_UUID.msb(); - private Lists() {} public static ColumnSpecification indexSpecOf(ColumnSpecification column) @@ -160,33 +151,6 @@ public abstract class Lists return type == null ? null : ListType.getInstance(type, false); } - /** - * Return a function that given a cell with an ACCORD_CELL_PATH_SENTINEL_MSB will - * return a new CellPath with a TimeUUID that increases monotonically every time it is called or - * the existing cell path if path does not contain ACCORD_CELL_PATH_SENTINEL_MSB. - * - * Only intended to work with list cell paths where list append needs a timestamp based on the executeAt - * of the Accord transaction appending the cell. - * @param timestampMicros executeAt timestamp to use as the MSB for generated cell paths - */ - public static com.google.common.base.Function<Cell, CellPath> accordListPathSupplier(long timestampMicros) - { - return new com.google.common.base.Function<Cell, CellPath>() - { - final long timeUuidMsb = TimeUUID.unixMicrosToMsb(timestampMicros); - long cellIndex = 0; - @Override - public CellPath apply(Cell cell) - { - CellPath path = cell.path(); - if (ACCORD_CELL_PATH_SENTINEL_MSB == path.get(0).getLong(0)) - return CellPath.create(ByteBuffer.wrap(TimeUUID.toBytes(timeUuidMsb, TimeUUIDType.signedBytesToNativeLong(cellIndex++)))); - else - return path; - } - }; - } - public static class Literal extends Term.Raw { private final List<Term.Raw> elements; @@ -463,17 +427,10 @@ public abstract class Lists // during SSTable write. Guardrails.itemsPerCollection.guard(type.collectionSize(elements), column.name.toString(), false, params.clientState); - long cellIndex = 0; int dataSize = 0; for (ByteBuffer buffer : elements) { - ByteBuffer cellPath; - // Accord will need to replace this value later once it knows the executeAt timestamp - // so just put a TimeUUID with MSB sentinel for now - if (params.constructingAccordBaseUpdate) - cellPath = TimeUUID.atUnixMicrosWithLsb(0, cellIndex++).toBytes(); - else - cellPath = ByteBuffer.wrap(params.nextTimeUUIDAsBytes()); + ByteBuffer cellPath = ByteBuffer.wrap(params.nextTimeUUIDAsBytes()); Cell<?> cell = params.addCell(column, CellPath.create(cellPath), buffer); dataSize += cell.dataSize(); } diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index 57aab6653a6..4a60f21e2b7 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -900,20 +900,16 @@ public class AccordKeyspace diskState = maybeUpdateMaxEpoch(diskState, epoch); String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' + "SET closed = closed + ? WHERE epoch = ?"; - executeInternal(cql, - KeySerializers.rangesToBlobMap(ranges), epoch); + executeInternal(cql, KeySerializers.rangesToBlobMap(ranges), epoch); return diskState; } - // TODO (required): unused public static EpochDiskState markRetired(Ranges ranges, long epoch, EpochDiskState diskState) { diskState = maybeUpdateMaxEpoch(diskState, epoch); String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' + "SET retired = retired + ? WHERE epoch = ?"; - executeInternal(cql, - KeySerializers.rangesToBlobMap(ranges), epoch); - flush(Topologies); + executeInternal(cql, KeySerializers.rangesToBlobMap(ranges), epoch); return diskState; } diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java index 436849e2907..b65acb29bc6 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java @@ -208,7 +208,7 @@ public class CommandSerializers { if ((flags & 1) != 0) return; - flags >>= 1; + flags >>>= 1; } in.readUnsignedVInt(); in.readUnsignedVInt(); @@ -240,7 +240,7 @@ public class CommandSerializers out.writeUnsignedVInt(executeAt.epoch()); out.writeUnsignedVInt(executeAt.hlc()); out.writeUnsignedVInt32(executeAt.node.id); - if ((flags & HAS_UNIQUE_HLC) != 0) + if (executeAt.hasDistinctHlcAndUniqueHlc()) out.writeUnsignedVInt(executeAt.uniqueHlc() - executeAt.hlc()); } } @@ -267,7 +267,7 @@ public class CommandSerializers size += TypeSizes.sizeofUnsignedVInt(executeAt.epoch()); size += TypeSizes.sizeofUnsignedVInt(executeAt.hlc()); size += TypeSizes.sizeofUnsignedVInt(executeAt.node.id); - if ((flags & HAS_UNIQUE_HLC) != 0) + if (executeAt.hasDistinctHlcAndUniqueHlc()) size += TypeSizes.sizeofUnsignedVInt(executeAt.uniqueHlc() - executeAt.hlc()); return size; } diff --git a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java index 24d3ef78aba..1bf46889dd5 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java +++ b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java @@ -26,9 +26,13 @@ import com.google.common.collect.ImmutableMap; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.UpdateParameters; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.paxos.Ballot; +import org.apache.cassandra.utils.TimeUUID; import static com.google.common.base.Preconditions.checkState; import static java.util.concurrent.TimeUnit.MICROSECONDS; @@ -46,6 +50,22 @@ public class AccordUpdateParameters this.timestamp = timestamp; } + static class RowUpdateParameters extends UpdateParameters + { + private long timeUuidNanos; + + public RowUpdateParameters(TableMetadata metadata, ClientState clientState, QueryOptions options, long timestamp, long nowInSec, int ttl, Map<DecoratedKey, Partition> prefetchedRows) throws InvalidRequestException + { + super(metadata, clientState, options, timestamp, nowInSec, ttl, prefetchedRows); + } + + @Override + public byte[] nextTimeUUIDAsBytes() + { + return TimeUUID.toBytes(Ballot.unixMicrosToMsb(timestamp), TimeUUIDType.signedBytesToNativeLong(timeUuidNanos++)); + } + } + public TxnData getData() { return data; @@ -60,13 +80,13 @@ public class AccordUpdateParameters // TODO : How should Accord work with TTL? int ttl = metadata.params.defaultTimeToLive; - return new UpdateParameters(metadata, - disabledGuardrails, - options, - timestamp, - MICROSECONDS.toSeconds(timestamp), - ttl, - prefetchRow(metadata, dk, rowIndex)); + return new RowUpdateParameters(metadata, + disabledGuardrails, + options, + timestamp, + MICROSECONDS.toSeconds(timestamp), + ttl, + prefetchRow(metadata, dk, rowIndex)); } private Map<DecoratedKey, Partition> prefetchRow(TableMetadata metadata, DecoratedKey dk, int index) diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java index 50a0ca9b1f5..08485feefd2 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java @@ -57,7 +57,6 @@ import accord.primitives.Route; import accord.primitives.SaveStatus; import accord.primitives.Seekable; import accord.primitives.Status; -import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.Txn.Kind; import accord.primitives.TxnId; diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java index 665ebd5338f..a6a3531d8a7 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java @@ -92,7 +92,6 @@ import static accord.primitives.Known.KnownExecuteAt.ExecuteAtErased; import static accord.primitives.Known.KnownExecuteAt.ExecuteAtUnknown; import static accord.primitives.Status.Durability.Majority; import static accord.primitives.Status.Durability.NotDurable; -import static accord.primitives.Status.Durability.max; import static accord.utils.Property.qt; import static accord.utils.SortedArrays.Search.FAST; import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
