This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e0f62370d3dd23bbf47fac464a701aaa94c924f6
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Jan 27 16:27:06 2025 +0000

    Follow-up to CASSANDRA-20228:
     - Fix AccordUpdateParameters to correctly supply List cell paths derived 
from applyAt
     - Fix ExecuteAtSerialize.serialiseNullable
     - Fix topologies flush regression caused by markRetired forcing a flush on 
every call
    
    patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-20228
---
 .../apache/cassandra/cql3/UpdateParameters.java    | 17 +-------
 .../cql3/statements/ModificationStatement.java     | 23 +++++------
 .../org/apache/cassandra/cql3/terms/Lists.java     | 45 +---------------------
 .../cassandra/service/accord/AccordKeyspace.java   |  8 +---
 .../accord/serializers/CommandSerializers.java     |  6 +--
 .../service/accord/txn/AccordUpdateParameters.java | 34 ++++++++++++----
 .../compaction/CompactionAccordIteratorsTest.java  |  1 -
 .../serializers/CommandsForKeySerializerTest.java  |  1 -
 8 files changed, 44 insertions(+), 91 deletions(-)

diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java 
b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 66ae2bc2691..e4ceec16fb7 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -51,10 +51,9 @@ public class UpdateParameters
     public final TableMetadata metadata;
     public final ClientState clientState;
     public final QueryOptions options;
-    public final boolean constructingAccordBaseUpdate;
 
     private final long nowInSec;
-    private final long timestamp;
+    protected final long timestamp;
     private final int ttl;
 
     private final DeletionTime deletionTime;
@@ -75,18 +74,6 @@ public class UpdateParameters
                             long nowInSec,
                             int ttl,
                             Map<DecoratedKey, Partition> prefetchedRows) 
throws InvalidRequestException
-    {
-        this(metadata, clientState, options, timestamp, nowInSec, ttl, 
prefetchedRows, false);
-    }
-
-    public UpdateParameters(TableMetadata metadata,
-                            ClientState clientState,
-                            QueryOptions options,
-                            long timestamp,
-                            long nowInSec,
-                            int ttl,
-                            Map<DecoratedKey, Partition> prefetchedRows,
-                            boolean constructingAccordBaseUpdate) throws 
InvalidRequestException
     {
         this.metadata = metadata;
         this.clientState = clientState;
@@ -104,8 +91,6 @@ public class UpdateParameters
         // it to avoid potential confusion.
         if (timestamp == Long.MIN_VALUE)
             throw new InvalidRequestException(String.format("Out of bound 
timestamp, must be in [%d, %d]", Long.MIN_VALUE + 1, Long.MAX_VALUE));
-
-        this.constructingAccordBaseUpdate = constructingAccordBaseUpdate;
     }
 
     public <V> void newRow(Clustering<V> clustering) throws 
InvalidRequestException
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 0231ab16fd1..11340815178 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -924,8 +924,8 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
                                                            local,
                                                            timestamp,
                                                            nowInSeconds,
-                                                           requestTime,
-                                                           
constructingAccordBaseUpdate);
+                                                           requestTime
+            );
             for (ByteBuffer key : keys)
             {
                 Validation.validateKey(metadata(), key);
@@ -945,7 +945,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
             if (restrictions.hasClusteringColumnsRestrictions() && 
clusterings.isEmpty())
                 return;
 
-            UpdateParameters params = makeUpdateParameters(keys, clusterings, 
state, options, local, timestamp, nowInSeconds, requestTime, 
constructingAccordBaseUpdate);
+            UpdateParameters params = makeUpdateParameters(keys, clusterings, 
state, options, local, timestamp, nowInSeconds, requestTime);
 
             for (ByteBuffer key : keys)
             {
@@ -982,8 +982,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
                                                   boolean local,
                                                   long timestamp,
                                                   long nowInSeconds,
-                                                  Dispatcher.RequestTime 
requestTime,
-                                                  boolean 
constructingAccordBaseUpdate)
+                                                  Dispatcher.RequestTime 
requestTime)
     {
         if (clusterings.contains(Clustering.STATIC_CLUSTERING))
             return makeUpdateParameters(keys,
@@ -994,8 +993,8 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
                                         local,
                                         timestamp,
                                         nowInSeconds,
-                                        requestTime,
-                                        constructingAccordBaseUpdate);
+                                        requestTime
+            );
 
         return makeUpdateParameters(keys,
                                     new 
ClusteringIndexNamesFilter(clusterings, false),
@@ -1005,8 +1004,8 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
                                     local,
                                     timestamp,
                                     nowInSeconds,
-                                    requestTime,
-                                    constructingAccordBaseUpdate);
+                                    requestTime
+        );
     }
 
     private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
@@ -1017,8 +1016,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
                                                   boolean local,
                                                   long timestamp,
                                                   long nowInSeconds,
-                                                  Dispatcher.RequestTime 
requestTime,
-                                                  boolean 
constructingAccordBaseUpdate)
+                                                  Dispatcher.RequestTime 
requestTime)
     {
         // Some lists operation requires reading
         Map<DecoratedKey, Partition> lists =
@@ -1036,8 +1034,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
                                     getTimestamp(timestamp, options),
                                     nowInSeconds,
                                     getTimeToLive(options),
-                                    lists,
-                                    constructingAccordBaseUpdate);
+                                    lists);
     }
 
     public static abstract class Parsed extends QualifiedStatement
diff --git a/src/java/org/apache/cassandra/cql3/terms/Lists.java 
b/src/java/org/apache/cassandra/cql3/terms/Lists.java
index a0889e75ded..9edd1e3f0bf 100644
--- a/src/java/org/apache/cassandra/cql3/terms/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/terms/Lists.java
@@ -44,7 +44,6 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.MultiElementType;
-import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.db.rows.ComplexColumnData;
@@ -52,7 +51,6 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.TimeUUID;
 
 import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
@@ -68,13 +66,6 @@ public abstract class Lists
     @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(Lists.class);
 
-    /**
-     * Sentinel value indicating the cell path should be replaced by Accord 
with one based on the transaction executeAt
-     */
-    private static final TimeUUID ACCORD_CELL_PATH_SENTINEL_UUID = 
TimeUUID.atUnixMicrosWithLsb(0, 0);
-    public static final CellPath ACCORD_DUMMY_CELL_PATH = 
CellPath.create(ACCORD_CELL_PATH_SENTINEL_UUID.toBytes());
-    private static final long ACCORD_CELL_PATH_SENTINEL_MSB = 
ACCORD_CELL_PATH_SENTINEL_UUID.msb();
-
     private Lists() {}
 
     public static ColumnSpecification indexSpecOf(ColumnSpecification column)
@@ -160,33 +151,6 @@ public abstract class Lists
         return type == null ? null : ListType.getInstance(type, false);
     }
 
-    /**
-     * Return a function that given a cell with an 
ACCORD_CELL_PATH_SENTINEL_MSB will
-     * return a new CellPath with a TimeUUID that increases monotonically 
every time it is called or
-     * the existing cell path if path does not contain 
ACCORD_CELL_PATH_SENTINEL_MSB.
-     *
-     * Only intended to work with list cell paths where list append needs a 
timestamp based on the executeAt
-     * of the Accord transaction appending the cell.
-     * @param timestampMicros executeAt timestamp to use as the MSB for 
generated cell paths
-     */
-    public static com.google.common.base.Function<Cell, CellPath> 
accordListPathSupplier(long timestampMicros)
-    {
-        return new com.google.common.base.Function<Cell, CellPath>()
-        {
-            final long timeUuidMsb = TimeUUID.unixMicrosToMsb(timestampMicros);
-            long cellIndex = 0;
-            @Override
-            public CellPath apply(Cell cell)
-            {
-                CellPath path = cell.path();
-                if (ACCORD_CELL_PATH_SENTINEL_MSB == path.get(0).getLong(0))
-                    return 
CellPath.create(ByteBuffer.wrap(TimeUUID.toBytes(timeUuidMsb, 
TimeUUIDType.signedBytesToNativeLong(cellIndex++))));
-                else
-                    return path;
-            }
-        };
-    }
-
     public static class Literal extends Term.Raw
     {
         private final List<Term.Raw> elements;
@@ -463,17 +427,10 @@ public abstract class Lists
                 // during SSTable write.
                 
Guardrails.itemsPerCollection.guard(type.collectionSize(elements), 
column.name.toString(), false, params.clientState);
 
-                long cellIndex = 0;
                 int dataSize = 0;
                 for (ByteBuffer buffer : elements)
                 {
-                    ByteBuffer cellPath;
-                    // Accord will need to replace this value later once it 
knows the executeAt timestamp
-                    // so just put a TimeUUID with MSB sentinel for now
-                    if (params.constructingAccordBaseUpdate)
-                        cellPath = TimeUUID.atUnixMicrosWithLsb(0, 
cellIndex++).toBytes();
-                    else
-                        cellPath = 
ByteBuffer.wrap(params.nextTimeUUIDAsBytes());
+                    ByteBuffer cellPath = 
ByteBuffer.wrap(params.nextTimeUUIDAsBytes());
                     Cell<?> cell = params.addCell(column, 
CellPath.create(cellPath), buffer);
                     dataSize += cell.dataSize();
                 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 57aab6653a6..4a60f21e2b7 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -900,20 +900,16 @@ public class AccordKeyspace
         diskState = maybeUpdateMaxEpoch(diskState, epoch);
         String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' 
+
                      "SET closed = closed + ? WHERE epoch = ?";
-        executeInternal(cql,
-                        KeySerializers.rangesToBlobMap(ranges), epoch);
+        executeInternal(cql, KeySerializers.rangesToBlobMap(ranges), epoch);
         return diskState;
     }
 
-    // TODO (required): unused
     public static EpochDiskState markRetired(Ranges ranges, long epoch, 
EpochDiskState diskState)
     {
         diskState = maybeUpdateMaxEpoch(diskState, epoch);
         String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' 
+
                      "SET retired = retired + ? WHERE epoch = ?";
-        executeInternal(cql,
-                        KeySerializers.rangesToBlobMap(ranges), epoch);
-        flush(Topologies);
+        executeInternal(cql, KeySerializers.rangesToBlobMap(ranges), epoch);
         return diskState;
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index 436849e2907..b65acb29bc6 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -208,7 +208,7 @@ public class CommandSerializers
             {
                 if ((flags & 1) != 0)
                     return;
-                flags >>= 1;
+                flags >>>= 1;
             }
             in.readUnsignedVInt();
             in.readUnsignedVInt();
@@ -240,7 +240,7 @@ public class CommandSerializers
                 out.writeUnsignedVInt(executeAt.epoch());
                 out.writeUnsignedVInt(executeAt.hlc());
                 out.writeUnsignedVInt32(executeAt.node.id);
-                if ((flags & HAS_UNIQUE_HLC) != 0)
+                if (executeAt.hasDistinctHlcAndUniqueHlc())
                     out.writeUnsignedVInt(executeAt.uniqueHlc() - 
executeAt.hlc());
             }
         }
@@ -267,7 +267,7 @@ public class CommandSerializers
             size += TypeSizes.sizeofUnsignedVInt(executeAt.epoch());
             size += TypeSizes.sizeofUnsignedVInt(executeAt.hlc());
             size += TypeSizes.sizeofUnsignedVInt(executeAt.node.id);
-            if ((flags & HAS_UNIQUE_HLC) != 0)
+            if (executeAt.hasDistinctHlcAndUniqueHlc())
                 size += TypeSizes.sizeofUnsignedVInt(executeAt.uniqueHlc() - 
executeAt.hlc());
             return size;
         }
diff --git 
a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java 
b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java
index 24d3ef78aba..1bf46889dd5 100644
--- 
a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java
+++ 
b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java
@@ -26,9 +26,13 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.UpdateParameters;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.utils.TimeUUID;
 
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
@@ -46,6 +50,22 @@ public class AccordUpdateParameters
         this.timestamp = timestamp;
     }
 
+    static class RowUpdateParameters extends UpdateParameters
+    {
+        private long timeUuidNanos;
+
+        public RowUpdateParameters(TableMetadata metadata, ClientState 
clientState, QueryOptions options, long timestamp, long nowInSec, int ttl, 
Map<DecoratedKey, Partition> prefetchedRows) throws InvalidRequestException
+        {
+            super(metadata, clientState, options, timestamp, nowInSec, ttl, 
prefetchedRows);
+        }
+
+        @Override
+        public byte[] nextTimeUUIDAsBytes()
+        {
+            return TimeUUID.toBytes(Ballot.unixMicrosToMsb(timestamp), 
TimeUUIDType.signedBytesToNativeLong(timeUuidNanos++));
+        }
+    }
+
     public TxnData getData()
     {
         return data;
@@ -60,13 +80,13 @@ public class AccordUpdateParameters
 
         // TODO : How should Accord work with TTL?
         int ttl = metadata.params.defaultTimeToLive;
-        return new UpdateParameters(metadata,
-                                    disabledGuardrails,
-                                    options,
-                                    timestamp,
-                                    MICROSECONDS.toSeconds(timestamp),
-                                    ttl,
-                                    prefetchRow(metadata, dk, rowIndex));
+        return new RowUpdateParameters(metadata,
+                                       disabledGuardrails,
+                                       options,
+                                       timestamp,
+                                       MICROSECONDS.toSeconds(timestamp),
+                                       ttl,
+                                       prefetchRow(metadata, dk, rowIndex));
     }
 
     private Map<DecoratedKey, Partition> prefetchRow(TableMetadata metadata, 
DecoratedKey dk, int index)
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index 50a0ca9b1f5..08485feefd2 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -57,7 +57,6 @@ import accord.primitives.Route;
 import accord.primitives.SaveStatus;
 import accord.primitives.Seekable;
 import accord.primitives.Status;
-import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.Txn.Kind;
 import accord.primitives.TxnId;
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 665ebd5338f..a6a3531d8a7 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -92,7 +92,6 @@ import static 
accord.primitives.Known.KnownExecuteAt.ExecuteAtErased;
 import static accord.primitives.Known.KnownExecuteAt.ExecuteAtUnknown;
 import static accord.primitives.Status.Durability.Majority;
 import static accord.primitives.Status.Durability.NotDurable;
-import static accord.primitives.Status.Durability.max;
 import static accord.utils.Property.qt;
 import static accord.utils.SortedArrays.Search.FAST;
 import static 
org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to