This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit a0af69846c4e4234a2b6e389008e075511a6742a
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Oct 10 10:19:50 2024 +0100

    Halve cache memory consumption by not retaining 'original' to diff; dedup 
RoutingKey tableId; avoid calculating rejectsFastPath in more cases; delay 
retry of fetchMajorityDeps; fix SetShardDurable marking shards durable
---
 modules/accord                                     |  2 +-
 .../apache/cassandra/journal/EntrySerializer.java  |  1 -
 .../apache/cassandra/journal/InMemoryIndex.java    |  1 -
 src/java/org/apache/cassandra/schema/TableId.java  |  6 +++
 .../service/accord/AccordCachingState.java         | 23 ++++-------
 .../service/accord/AccordCommandStore.java         | 17 ++++-----
 .../cassandra/service/accord/AccordJournal.java    |  1 -
 .../cassandra/service/accord/AccordKeyspace.java   | 44 +++++++++++++++-------
 .../service/accord/AccordSafeCommandStore.java     | 12 +++---
 .../cassandra/service/accord/AccordStateCache.java | 10 ++---
 .../service/accord/api/AccordRoutingKey.java       |  6 +--
 .../cassandra/service/accord/api/PartitionKey.java |  4 +-
 .../service/accord/async/AsyncOperation.java       | 22 +++++------
 .../compaction/CompactionAccordIteratorsTest.java  |  2 +-
 .../service/accord/AccordStateCacheTest.java       | 18 ++++-----
 .../service/accord/async/AsyncLoaderTest.java      | 15 ++++----
 16 files changed, 96 insertions(+), 88 deletions(-)

diff --git a/modules/accord b/modules/accord
index 841e139bc8..8bce46bee7 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 841e139bc8a974ac674ce8eae847bd52255ca544
+Subproject commit 8bce46bee7497262a8c16c6b779c08558968604f
diff --git a/src/java/org/apache/cassandra/journal/EntrySerializer.java 
b/src/java/org/apache/cassandra/journal/EntrySerializer.java
index 48ef59a6e4..2a707e7d73 100644
--- a/src/java/org/apache/cassandra/journal/EntrySerializer.java
+++ b/src/java/org/apache/cassandra/journal/EntrySerializer.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.Set;
 import java.util.zip.CRC32;
 
-import accord.utils.Invariants;
 import org.agrona.collections.IntHashSet;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputBuffer;
diff --git a/src/java/org/apache/cassandra/journal/InMemoryIndex.java 
b/src/java/org/apache/cassandra/journal/InMemoryIndex.java
index 8141f338a9..77fd7352ee 100644
--- a/src/java/org/apache/cassandra/journal/InMemoryIndex.java
+++ b/src/java/org/apache/cassandra/journal/InMemoryIndex.java
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nullable;
 
-import accord.utils.Invariants;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileOutputStreamPlus;
 import org.apache.cassandra.journal.StaticSegment.SequentialReader;
diff --git a/src/java/org/apache/cassandra/schema/TableId.java 
b/src/java/org/apache/cassandra/schema/TableId.java
index 302d7db6bf..03fd3dc490 100644
--- a/src/java/org/apache/cassandra/schema/TableId.java
+++ b/src/java/org/apache/cassandra/schema/TableId.java
@@ -200,6 +200,12 @@ public class TableId implements Comparable<TableId>
         return new TableId(new UUID(accessor.getLong(src, offset), 
accessor.getLong(src, offset + TypeSizes.LONG_SIZE)));
     }
 
+    public TableId tryIntern()
+    {
+        TableMetadata metadata = Schema.instance.getTableMetadata(this);
+        return metadata == null ? this : metadata.id;
+    }
+
     @Override
     public int compareTo(TableId o)
     {
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCachingState.java 
b/src/java/org/apache/cassandra/service/accord/AccordCachingState.java
index 50a48be1fb..b8bb61e00c 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCachingState.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCachingState.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.service.accord;
 
 import java.util.concurrent.Callable;
-import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.ToLongFunction;
 
@@ -208,10 +207,10 @@ public class AccordCachingState<K, V> extends 
IntrusiveLinkedListNode
      * has completed, the state save will have either completed or failed.
      */
     @VisibleForTesting
-    public void save(ExecutorPlus executor, BiFunction<?, ?, Runnable> 
saveFunction)
+    public void save(ExecutorPlus executor, Function<?, Runnable> saveFunction)
     {
         @SuppressWarnings("unchecked")
-        State<K, V> savingOrLoaded = state.save((BiFunction<V, V, Runnable>) 
saveFunction);
+        State<K, V> savingOrLoaded = state.save((Function<V, Runnable>) 
saveFunction);
         if (savingOrLoaded.status() == SAVING)
             executor.submit(savingOrLoaded.saving());
         state(savingOrLoaded);
@@ -319,7 +318,7 @@ public class AccordCachingState<K, V> extends 
IntrusiveLinkedListNode
             throw illegalState(this, "set(value)");
         }
 
-        default State<K, V> save(BiFunction<V, V, Runnable> saveFunction)
+        default State<K, V> save(Function<V, Runnable> saveFunction)
         {
             throw illegalState(this, "save(saveFunction)");
         }
@@ -447,7 +446,7 @@ public class AccordCachingState<K, V> extends 
IntrusiveLinkedListNode
         @Override
         public State<K, V> set(V value)
         {
-            return value == original ? this : new Modified<>(original, value);
+            return value == original ? this : new Modified<>(value);
         }
 
         @Override
@@ -499,12 +498,10 @@ public class AccordCachingState<K, V> extends 
IntrusiveLinkedListNode
 
     static class Modified<K, V> implements State<K, V>
     {
-        final V original;
         V current;
 
-        Modified(V original, V current)
+        Modified(V current)
         {
-            this.original = original;
             this.current = current;
         }
 
@@ -523,17 +520,14 @@ public class AccordCachingState<K, V> extends 
IntrusiveLinkedListNode
         @Override
         public State<K, V> set(V value)
         {
-            if (value == original) // change reverted
-                return new Loaded<>(original);
-
             current = value;
             return this;
         }
 
         @Override
-        public State<K, V> save(BiFunction<V, V, Runnable> saveFunction)
+        public State<K, V> save(Function<V, Runnable> saveFunction)
         {
-            Runnable runnable = saveFunction.apply(original, current);
+            Runnable runnable = saveFunction.apply(current);
             if (null == runnable) // null mutation -> null Runnable -> no 
change on disk
                 return new Loaded<>(current);
             else
@@ -543,8 +537,7 @@ public class AccordCachingState<K, V> extends 
IntrusiveLinkedListNode
         @Override
         public long estimateOnHeapSize(ToLongFunction<V> estimateFunction)
         {
-            return (null == original ? 0 : 
estimateFunction.applyAsLong(original))
-                 + (null == current  ? 0 : 
estimateFunction.applyAsLong(current));
+            return (null == current  ? 0 : 
estimateFunction.applyAsLong(current));
         }
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 718c669f47..196d5d4913 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -79,11 +79,8 @@ import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Promise;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
-import static accord.primitives.SaveStatus.Applying;
 import static accord.primitives.Status.Committed;
 import static accord.primitives.Status.Invalidated;
-import static accord.primitives.Status.PreApplied;
-import static accord.primitives.Status.Stable;
 import static accord.primitives.Status.Truncated;
 import static accord.utils.Invariants.checkState;
 
@@ -290,12 +287,12 @@ public class AccordCommandStore extends CommandStore
 
     @Nullable
     @VisibleForTesting
-    public Runnable appendToKeyspace(Command before, Command after)
+    public Runnable appendToKeyspace(Command after)
     {
         if (after.txnId().is(Routable.Domain.Key))
             return null;
 
-        Mutation mutation = AccordKeyspace.getCommandMutation(this.id, before, 
after, nextSystemTimestampMicros());
+        Mutation mutation = AccordKeyspace.getCommandMutation(this.id, after, 
nextSystemTimestampMicros());
 
         // TODO (required): make sure we test recovering when this has failed 
to be persisted
         if (null != mutation)
@@ -360,14 +357,14 @@ public class AccordCommandStore extends CommandStore
     }
 
     @Nullable
-    private Runnable saveTimestampsForKey(TimestampsForKey before, 
TimestampsForKey after)
+    private Runnable saveTimestampsForKey(TimestampsForKey after)
     {
-        Mutation mutation = AccordKeyspace.getTimestampsForKeyMutation(id, 
before, after, nextSystemTimestampMicros());
+        Mutation mutation = AccordKeyspace.getTimestampsForKeyMutation(id, 
after, nextSystemTimestampMicros());
         return null != mutation ? mutation::applyUnsafe : null;
     }
 
     @Nullable
-    private Runnable saveCommandsForKey(CommandsForKey before, CommandsForKey 
after)
+    private Runnable saveCommandsForKey(CommandsForKey after)
     {
         Mutation mutation = AccordKeyspace.getCommandsForKeyMutation(id, 
after, nextSystemTimestampMicros());
         return null != mutation ? mutation::applyUnsafe : null;
@@ -447,8 +444,8 @@ public class AccordCommandStore extends CommandStore
 
     public AccordSafeCommandStore beginOperation(PreLoadContext preLoadContext,
                                                  Map<TxnId, AccordSafeCommand> 
commands,
-                                                 NavigableMap<RoutingKey, 
AccordSafeTimestampsForKey> timestampsForKeys,
-                                                 NavigableMap<RoutingKey, 
AccordSafeCommandsForKey> commandsForKeys,
+                                                 Map<RoutingKey, 
AccordSafeTimestampsForKey> timestampsForKeys,
+                                                 Map<RoutingKey, 
AccordSafeCommandsForKey> commandsForKeys,
                                                  @Nullable 
AccordSafeCommandsForRanges commandsForRanges)
     {
         checkState(current == null);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index c0bf9c5a6b..26b868e5d1 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.service.accord;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.Set;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 725deb05fc..b3b04a5172 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -805,6 +805,18 @@ public class AccordKeyspace
         }
     }
 
+    private static <C, V> void addCell(ColumnMetadata column, Function<C, V> 
get, SerializeFunction<V> serialize, Row.Builder builder, long timestampMicros, 
int nowInSeconds, C current) throws IOException
+    {
+        V newValue = get.apply(current);
+        if (newValue == null) builder.addCell(tombstone(column, 
timestampMicros, nowInSeconds));
+        else builder.addCell(live(column, timestampMicros, 
serialize.apply(newValue)));
+    }
+
+    private static <C extends Command, V> void addCell(ColumnMetadata column, 
Function<C, V> get, LocalVersionedSerializer<V> serializer, Row.Builder 
builder, long timestampMicros, int nowInSeconds, C command) throws IOException
+    {
+        addCell(column, get, v -> serializeOrNull(v, serializer), builder, 
timestampMicros, nowInSeconds, command);
+    }
+
     private static <C extends Command, V> void 
addCellIfModified(ColumnMetadata column, Function<C, V> get, 
LocalVersionedSerializer<V> serializer, Row.Builder builder, long 
timestampMicros, int nowInSeconds, C original, C command) throws IOException
     {
         addCellIfModified(column, get, v -> serializeOrNull(v, serializer), 
builder, timestampMicros, nowInSeconds, original, command);
@@ -817,29 +829,34 @@ public class AccordKeyspace
         addCellIfModified(column, get, v -> accessor.valueOf(v.ordinal()), 
builder, timestampMicros, nowInSeconds, original, command);
     }
 
+    private static <C extends Command, V extends Enum<V>> void 
addEnumCell(ColumnMetadata column, Function<C, V> get, Row.Builder builder, 
long timestampMicros, int nowInSeconds, C command) throws IOException
+    {
+        // TODO: convert to byte arrays
+        ValueAccessor<ByteBuffer> accessor = ByteBufferAccessor.instance;
+        addCell(column, get, v -> accessor.valueOf(v.ordinal()), builder, 
timestampMicros, nowInSeconds, command);
+    }
+
     public static Mutation getCommandMutation(AccordCommandStore commandStore, 
AccordSafeCommand liveCommand, long timestampMicros)
     {
-        return getCommandMutation(commandStore.id(), liveCommand.original(), 
liveCommand.current(), timestampMicros);
+        return getCommandMutation(commandStore.id(), liveCommand.current(), 
timestampMicros);
     }
 
-    public static Mutation getCommandMutation(int storeId, Command original, 
Command command, long timestampMicros)
+    public static Mutation getCommandMutation(int storeId, Command command, 
long timestampMicros)
     {
         if (command.saveStatus() == SaveStatus.Uninitialised)
             return null;
 
         try
         {
-            Invariants.checkArgument(original != command);
-
             Row.Builder builder = BTreeRow.unsortedBuilder();
             builder.newRow(Clustering.EMPTY);
             int nowInSeconds = (int) 
TimeUnit.MICROSECONDS.toSeconds(timestampMicros);
             
builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(timestampMicros, 
nowInSeconds));
 
-            addEnumCellIfModified(CommandsColumns.durability, 
Command::durability, builder, timestampMicros, nowInSeconds, original, command);
-            addCellIfModified(CommandsColumns.participants, 
Command::participants, LocalVersionedSerializers.participants, builder, 
timestampMicros, nowInSeconds, original, command);
-            addEnumCellIfModified(CommandsColumns.status, Command::saveStatus, 
builder, timestampMicros, nowInSeconds, original, command);
-            addCellIfModified(CommandsColumns.execute_at, Command::executeAt, 
AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds, 
original, command);
+            addEnumCell(CommandsColumns.durability, Command::durability, 
builder, timestampMicros, nowInSeconds, command);
+            addCell(CommandsColumns.participants, Command::participants, 
LocalVersionedSerializers.participants, builder, timestampMicros, nowInSeconds, 
command);
+            addEnumCell(CommandsColumns.status, Command::saveStatus, builder, 
timestampMicros, nowInSeconds, command);
+            addCell(CommandsColumns.execute_at, Command::executeAt, 
AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds, 
command);
 
             Row row = builder.build();
             if (row.columnCount() == 0)
@@ -1080,11 +1097,10 @@ public class AccordKeyspace
         return (TokenKey) 
AccordRoutingKeyByteSource.Serializer.fromComparableBytes(ByteBufferAccessor.instance,
 tokenBytes, tableId, currentVersion, null);
     }
 
-    public static Mutation getTimestampsForKeyMutation(int storeId, 
TimestampsForKey original, TimestampsForKey current, long timestampMicros)
+    public static Mutation getTimestampsForKeyMutation(int storeId, 
TimestampsForKey current, long timestampMicros)
     {
         try
         {
-            Invariants.checkArgument(original != current);
             // TODO: convert to byte arrays
             ValueAccessor<ByteBuffer> accessor = ByteBufferAccessor.instance;
 
@@ -1093,9 +1109,9 @@ public class AccordKeyspace
             int nowInSeconds = (int) 
TimeUnit.MICROSECONDS.toSeconds(timestampMicros);
             LivenessInfo livenessInfo = LivenessInfo.create(timestampMicros, 
nowInSeconds);
             builder.addPrimaryKeyLivenessInfo(livenessInfo);
-            addCellIfModified(TimestampsForKeyColumns.last_executed_timestamp, 
TimestampsForKey::lastExecutedTimestamp, AccordKeyspace::serializeTimestamp, 
builder, timestampMicros, nowInSeconds, original, current);
-            addCellIfModified(TimestampsForKeyColumns.last_executed_micros, 
TimestampsForKey::rawLastExecutedHlc, accessor::valueOf, builder, 
timestampMicros, nowInSeconds, original, current);
-            addCellIfModified(TimestampsForKeyColumns.last_write_timestamp, 
TimestampsForKey::lastWriteTimestamp, AccordKeyspace::serializeTimestamp, 
builder, timestampMicros, nowInSeconds, original, current);
+            addCell(TimestampsForKeyColumns.last_executed_timestamp, 
TimestampsForKey::lastExecutedTimestamp, AccordKeyspace::serializeTimestamp, 
builder, timestampMicros, nowInSeconds, current);
+            addCell(TimestampsForKeyColumns.last_executed_micros, 
TimestampsForKey::rawLastExecutedHlc, accessor::valueOf, builder, 
timestampMicros, nowInSeconds, current);
+            addCell(TimestampsForKeyColumns.last_write_timestamp, 
TimestampsForKey::lastWriteTimestamp, AccordKeyspace::serializeTimestamp, 
builder, timestampMicros, nowInSeconds, current);
 
             Row row = builder.build();
             if (row.columnCount() == 0)
@@ -1113,7 +1129,7 @@ public class AccordKeyspace
 
     public static Mutation getTimestampsForKeyMutation(AccordCommandStore 
commandStore, AccordSafeTimestampsForKey liveTimestamps, long timestampMicros)
     {
-        return getTimestampsForKeyMutation(commandStore.id(), 
liveTimestamps.original(), liveTimestamps.current(), timestampMicros);
+        return getTimestampsForKeyMutation(commandStore.id(), 
liveTimestamps.current(), timestampMicros);
     }
 
     public static UntypedResultSet loadTimestampsForKeyRow(CommandStore 
commandStore, TokenKey key)
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index f97f8336b7..4025049d39 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -53,8 +53,8 @@ import accord.utils.Invariants;
 public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeCommand, AccordSafeTimestampsForKey, 
AccordSafeCommandsForKey>
 {
     private final Map<TxnId, AccordSafeCommand> commands;
-    private final NavigableMap<RoutingKey, AccordSafeCommandsForKey> 
commandsForKeys;
-    private final NavigableMap<RoutingKey, AccordSafeTimestampsForKey> 
timestampsForKeys;
+    private final Map<RoutingKey, AccordSafeCommandsForKey> commandsForKeys;
+    private final Map<RoutingKey, AccordSafeTimestampsForKey> 
timestampsForKeys;
     private final @Nullable AccordSafeCommandsForRanges commandsForRanges;
     private final AccordCommandStore commandStore;
     private RangesForEpoch ranges;
@@ -62,8 +62,8 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
 
     private AccordSafeCommandStore(PreLoadContext context,
                                    Map<TxnId, AccordSafeCommand> commands,
-                                   NavigableMap<RoutingKey, 
AccordSafeTimestampsForKey> timestampsForKey,
-                                   NavigableMap<RoutingKey, 
AccordSafeCommandsForKey> commandsForKey,
+                                   Map<RoutingKey, AccordSafeTimestampsForKey> 
timestampsForKey,
+                                   Map<RoutingKey, AccordSafeCommandsForKey> 
commandsForKey,
                                    @Nullable AccordSafeCommandsForRanges 
commandsForRanges,
                                    AccordCommandStore commandStore)
     {
@@ -80,8 +80,8 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
 
     public static AccordSafeCommandStore create(PreLoadContext preLoadContext,
                                                 Map<TxnId, AccordSafeCommand> 
commands,
-                                                NavigableMap<RoutingKey, 
AccordSafeTimestampsForKey> timestampsForKey,
-                                                NavigableMap<RoutingKey, 
AccordSafeCommandsForKey> commandsForKey,
+                                                Map<RoutingKey, 
AccordSafeTimestampsForKey> timestampsForKey,
+                                                Map<RoutingKey, 
AccordSafeCommandsForKey> commandsForKey,
                                                 @Nullable 
AccordSafeCommandsForRanges commandsForRanges,
                                                 AccordCommandStore 
commandStore)
     {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java 
b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
index 019b888ded..ccbc90fdb2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
@@ -240,7 +240,7 @@ public class AccordStateCache extends 
IntrusiveLinkedList<AccordCachingState<?,?
         Class<S> valClass,
         Function<AccordCachingState<K, V>, S> safeRefFactory,
         Function<K, V> loadFunction,
-        BiFunction<V, V, Runnable> saveFunction,
+        Function<V, Runnable> saveFunction,
         BiFunction<K, V, Boolean> validateFunction,
         ToLongFunction<V> heapEstimator,
         AccordCachingState.Factory<K, V> nodeFactory)
@@ -262,7 +262,7 @@ public class AccordStateCache extends 
IntrusiveLinkedList<AccordCachingState<?,?
         Class<S> valClass,
         Function<AccordCachingState<K, V>, S> safeRefFactory,
         Function<K, V> loadFunction,
-        BiFunction<V, V, Runnable> saveFunction,
+        Function<V, Runnable> saveFunction,
         BiFunction<K, V, Boolean> validateFunction,
         ToLongFunction<V> heapEstimator)
     {
@@ -287,7 +287,7 @@ public class AccordStateCache extends 
IntrusiveLinkedList<AccordCachingState<?,?
         private final Class<K> keyClass;
         private final Function<AccordCachingState<K, V>, S> safeRefFactory;
         private Function<K, V> loadFunction;
-        private BiFunction<V, V, Runnable> saveFunction;
+        private Function<V, Runnable> saveFunction;
         private final BiFunction<K, V, Boolean> validateFunction;
         private final ToLongFunction<V> heapEstimator;
         private long bytesCached;
@@ -303,7 +303,7 @@ public class AccordStateCache extends 
IntrusiveLinkedList<AccordCachingState<?,?
             int index, Class<K> keyClass,
             Function<AccordCachingState<K, V>, S> safeRefFactory,
             Function<K, V> loadFunction,
-            BiFunction<V, V, Runnable> saveFunction,
+            Function<V, Runnable> saveFunction,
             BiFunction<K, V, Boolean> validateFunction,
             ToLongFunction<V> heapEstimator,
             AccordCachingState.Factory<K, V> nodeFactory)
@@ -643,7 +643,7 @@ public class AccordStateCache extends 
IntrusiveLinkedList<AccordCachingState<?,?
         }
 
         @VisibleForTesting
-        public void unsafeSetSaveFunction(BiFunction<V, V, Runnable> 
saveFunction)
+        public void unsafeSetSaveFunction(Function<V, Runnable> saveFunction)
         {
             this.saveFunction = saveFunction;
         }
diff --git 
a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
index 6d8d2b8184..deec2b21ab 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
@@ -185,7 +185,7 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
             @Override
             public SentinelKey deserialize(DataInputPlus in, int version) 
throws IOException
             {
-                TableId table = TableId.deserialize(in);
+                TableId table = TableId.deserialize(in).tryIntern();
                 boolean isMin = in.readBoolean();
                 return new SentinelKey(table, isMin);
             }
@@ -287,14 +287,14 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
             @Override
             public TokenKey deserialize(DataInputPlus in, int version) throws 
IOException
             {
-                TableId table = TableId.deserialize(in);
+                TableId table = TableId.deserialize(in).tryIntern();
                 Token token = Token.compactSerializer.deserialize(in, 
getPartitioner(), version);
                 return new TokenKey(table, token);
             }
 
             public TokenKey fromBytes(ByteBuffer bytes, IPartitioner 
partitioner)
             {
-                TableId tableId = TableId.deserialize(bytes, 
ByteBufferAccessor.instance, 0);
+                TableId tableId = TableId.deserialize(bytes, 
ByteBufferAccessor.instance, 0).tryIntern();
                 bytes.position(tableId.serializedSize());
                 Token token = Token.compactSerializer.deserialize(bytes, 
partitioner);
                 return new TokenKey(tableId, token);
diff --git a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java 
b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
index fc78fe6692..aaa1264ea0 100644
--- a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
@@ -154,7 +154,7 @@ public final class PartitionKey extends AccordRoutableKey 
implements Key
         @Override
         public PartitionKey deserialize(DataInputPlus in, int version) throws 
IOException
         {
-            TableId tableId = TableId.deserialize(in);
+            TableId tableId = TableId.deserialize(in).tryIntern();
             IPartitioner partitioner = 
Schema.instance.getExistingTablePartitioner(tableId);
             DecoratedKey key = 
partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
             return new PartitionKey(tableId, key);
@@ -162,7 +162,7 @@ public final class PartitionKey extends AccordRoutableKey 
implements Key
 
         public <V> PartitionKey deserialize(V src, ValueAccessor<V> accessor, 
int offset) throws IOException
         {
-            TableId tableId = TableId.deserialize(src, accessor, offset);
+            TableId tableId = TableId.deserialize(src, accessor, 
offset).tryIntern();
             offset += tableId.serializedSize();
             TableMetadata metadata = Schema.instance.getTableMetadata(tableId);
             int numBytes = accessor.getShort(src, offset);
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index cc5bee6d32..463350adf8 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -18,9 +18,7 @@
 package org.apache.cassandra.service.accord.async;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.TreeMap;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -39,13 +37,13 @@ import accord.primitives.TxnId;
 import accord.primitives.Unseekables;
 import accord.utils.Invariants;
 import accord.utils.async.AsyncChains;
+import org.agrona.collections.Object2ObjectHashMap;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.service.accord.AccordCommandStore;
 import org.apache.cassandra.service.accord.AccordSafeCommand;
 import org.apache.cassandra.service.accord.AccordSafeCommandStore;
 import org.apache.cassandra.service.accord.AccordSafeCommandsForKey;
 import org.apache.cassandra.service.accord.AccordSafeCommandsForRanges;
-import org.apache.cassandra.service.accord.AccordSafeState;
 import org.apache.cassandra.service.accord.AccordSafeTimestampsForKey;
 import org.apache.cassandra.service.accord.SavedCommand;
 import org.apache.cassandra.utils.concurrent.Condition;
@@ -71,28 +69,28 @@ public abstract class AsyncOperation<R> extends 
AsyncChains.Head<R> implements R
 
     static class Context
     {
-        final HashMap<TxnId, AccordSafeCommand> commands = new HashMap<>();
-        final TreeMap<RoutingKey, AccordSafeTimestampsForKey> timestampsForKey 
= new TreeMap<>();
-        final TreeMap<RoutingKey, AccordSafeCommandsForKey> commandsForKey = 
new TreeMap<>();
+        final Object2ObjectHashMap<TxnId, AccordSafeCommand> commands = new 
Object2ObjectHashMap<>();
+        final Object2ObjectHashMap<RoutingKey, AccordSafeTimestampsForKey> 
timestampsForKey = new Object2ObjectHashMap<>();
+        final Object2ObjectHashMap<RoutingKey, AccordSafeCommandsForKey> 
commandsForKey = new Object2ObjectHashMap<>();
         @Nullable
         AccordSafeCommandsForRanges commandsForRanges = null;
 
         void releaseResources(AccordCommandStore commandStore)
         {
             // TODO (expected): we should destructively iterate to avoid 
invoking second time in fail; or else read and set to null
-            commands.values().forEach(commandStore.commandCache()::release);
+            commands.forEach((k, v) -> commandStore.commandCache().release(v));
             commands.clear();
-            
timestampsForKey.values().forEach(commandStore.timestampsForKeyCache()::release);
+            timestampsForKey.forEach((k, v) -> 
commandStore.timestampsForKeyCache().release(v));
             timestampsForKey.clear();
-            
commandsForKey.values().forEach(commandStore.commandsForKeyCache()::release);
+            commandsForKey.forEach((k, v) -> 
commandStore.commandsForKeyCache().release(v));
             commandsForKey.clear();
         }
 
         void revertChanges()
         {
-            commands.values().forEach(AccordSafeState::revert);
-            timestampsForKey.values().forEach(AccordSafeState::revert);
-            commandsForKey.values().forEach(AccordSafeState::revert);
+            commands.forEach((k, v) -> v.revert());
+            timestampsForKey.forEach((k, v) -> v.revert());
+            commandsForKey.forEach((k, v) -> v.revert());
             if (commandsForRanges != null)
                 commandsForRanges.revert();
         }
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index a329be5864..ca59f202b7 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -522,7 +522,7 @@ public class CompactionAccordIteratorsTest
     private static BiConsumer<Command, Command> 
appendDiffToKeyspace(AccordCommandStore commandStore)
     {
         return (before, after) -> {
-            AccordKeyspace.getCommandMutation(commandStore.id(), before, 
after, commandStore.nextSystemTimestampMicros()).applyUnsafe();
+            AccordKeyspace.getCommandMutation(commandStore.id(), after, 
commandStore.nextSystemTimestampMicros()).applyUnsafe();
         };
     }
 
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java
index d82f143054..6f10d977b2 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java
@@ -183,7 +183,7 @@ public class AccordStateCacheTest
         ManualExecutor executor = new ManualExecutor();
         AccordStateCache cache = new AccordStateCache(executor, executor, 500, 
cacheMetrics);
         AccordStateCache.Instance<String, String, SafeString> instance =
-            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (original, current) -> null, (k, v) -> true, String::length);
+            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (current) -> null, (k, v) -> true, String::length);
         assertCacheState(cache, 0, 0, 0);
 
         SafeString safeString1 = instance.acquire("1");
@@ -215,9 +215,9 @@ public class AccordStateCacheTest
         ManualExecutor executor = new ManualExecutor();
         AccordStateCache cache = new AccordStateCache(executor, executor, 500, 
cacheMetrics);
         AccordStateCache.Instance<String, String, SafeString> stringInstance =
-            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (original, current) -> null, (k, v) -> true,String::length);
+            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (current) -> null, (k, v) -> true,String::length);
         AccordStateCache.Instance<Integer, Integer, SafeInt> intInstance =
-            cache.instance(Integer.class, SafeInt.class, SafeInt::new, key -> 
key, (original, current) -> null, (k, v) -> true,ignored -> Integer.BYTES);
+            cache.instance(Integer.class, SafeInt.class, SafeInt::new, key -> 
key, (current) -> null, (k, v) -> true,ignored -> Integer.BYTES);
         assertCacheState(cache, 0, 0, 0);
 
         SafeString safeString1 = stringInstance.acquire("1");
@@ -255,7 +255,7 @@ public class AccordStateCacheTest
         ManualExecutor executor = new ManualExecutor();
         AccordStateCache cache = new AccordStateCache(executor, executor, 
DEFAULT_NODE_SIZE * 5, cacheMetrics);
         AccordStateCache.Instance<String, String, SafeString> instance =
-            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (original, current) -> null, (k, v) -> true, String::length);
+            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (current) -> null, (k, v) -> true, String::length);
         assertCacheState(cache, 0, 0, 0);
 
         SafeString[] items = new SafeString[3];
@@ -295,7 +295,7 @@ public class AccordStateCacheTest
         ManualExecutor executor = new ManualExecutor();
         AccordStateCache cache = new AccordStateCache(executor, executor, 
nodeSize(1) * 5, cacheMetrics);
         AccordStateCache.Instance<String, String, SafeString> instance =
-            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (original, current) -> null, (k, v) -> true, String::length);
+            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (current) -> null, (k, v) -> true, String::length);
         assertCacheState(cache, 0, 0, 0);
 
         SafeString[] items = new SafeString[5];
@@ -341,7 +341,7 @@ public class AccordStateCacheTest
         ManualExecutor executor = new ManualExecutor();
         AccordStateCache cache = new AccordStateCache(executor, executor, 
nodeSize(1) * 4, cacheMetrics);
         AccordStateCache.Instance<String, String, SafeString> instance =
-            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (original, current) -> null, (k, v) -> true, String::length);
+            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (current) -> null, (k, v) -> true, String::length);
         assertCacheState(cache, 0, 0, 0);
 
         SafeString[] items = new SafeString[5];
@@ -380,7 +380,7 @@ public class AccordStateCacheTest
         ManualExecutor executor = new ManualExecutor();
         AccordStateCache cache = new AccordStateCache(executor, executor, 
DEFAULT_NODE_SIZE * 4, cacheMetrics);
         AccordStateCache.Instance<String, String, SafeString> instance =
-            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (original, current) -> null, (k, v) -> true, String::length);
+            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (current) -> null, (k, v) -> true, String::length);
         assertCacheState(cache, 0, 0, 0);
 
         SafeString safeString1 = instance.acquire("0");
@@ -411,7 +411,7 @@ public class AccordStateCacheTest
         ManualExecutor executor = new ManualExecutor();
         AccordStateCache cache = new AccordStateCache(executor, executor, 
nodeSize(1) * 3 + nodeSize(3), cacheMetrics);
         AccordStateCache.Instance<String, String, SafeString> instance =
-            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (original, current) -> null, (k, v) -> true, String::length);
+            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (current) -> null, (k, v) -> true, String::length);
         assertCacheState(cache, 0, 0, 0);
 
         SafeString item = instance.acquire(Integer.toString(0));
@@ -450,7 +450,7 @@ public class AccordStateCacheTest
         ManualExecutor executor = new ManualExecutor();
         AccordStateCache cache = new AccordStateCache(executor, executor, 500, 
cacheMetrics);
         AccordStateCache.Instance<String, String, SafeString> instance =
-            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (original, current) -> null, (k, v) -> true, String::length);
+            cache.instance(String.class, SafeString.class, SafeString::new, 
key -> key, (current) -> null, (k, v) -> true, String::length);
         assertCacheState(cache, 0, 0, 0);
 
         SafeString safeString = instance.acquire("1");
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
index 680777bde8..5a09a23134 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.service.accord.async;
 
+import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -155,7 +156,7 @@ public class AsyncLoaderTest
         timestamps.preExecute();
         timestamps.initialize();
 
-        AccordKeyspace.getTimestampsForKeyMutation(commandStore.id(), null, 
timestamps.current(), commandStore.nextSystemTimestampMicros()).apply();
+        AccordKeyspace.getTimestampsForKeyMutation(commandStore.id(), 
timestamps.current(), commandStore.nextSystemTimestampMicros()).apply();
 
         // resources are on disk only, so the loader should suspend...
         AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), 
RoutingKeys.of(key), TIMESTAMPS);
@@ -203,7 +204,7 @@ public class AsyncLoaderTest
         testLoad(executor, safeCommand, notDefined(txnId, txn));
         commandCache.release(safeCommand);
 
-        AccordKeyspace.getTimestampsForKeyMutation(commandStore.id(), null, 
new TimestampsForKey(key), commandStore.nextSystemTimestampMicros()).apply();
+        AccordKeyspace.getTimestampsForKeyMutation(commandStore.id(), new 
TimestampsForKey(key), commandStore.nextSystemTimestampMicros()).apply();
 
         // resources are on disk only, so the loader should suspend...
         AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), 
RoutingKeys.of(key), TIMESTAMPS);
@@ -353,7 +354,7 @@ public class AsyncLoaderTest
         // acquire / release
 
         commandCache.unsafeSetLoadFunction(id -> notDefined(id, txn));
-        commandCache.unsafeSetSaveFunction((before, after) -> () -> { throw 
new AssertionError("nodes expected to be saved manually"); });
+        commandCache.unsafeSetSaveFunction((after) -> () -> { throw new 
AssertionError("nodes expected to be saved manually"); });
 
         AccordSafeCommand safeCommand = commandCache.acquire(txnId);
         testLoad(executor, safeCommand, notDefined(txnId, txn));
@@ -361,7 +362,7 @@ public class AsyncLoaderTest
         commandCache.release(safeCommand);
 
         Assert.assertEquals(AccordCachingState.Status.MODIFIED, 
commandCache.getUnsafe(txnId).status());
-        commandCache.getUnsafe(txnId).save(executor, (before, after) -> () -> 
{});
+        commandCache.getUnsafe(txnId).save(executor, (after) -> () -> {});
         Assert.assertEquals(AccordCachingState.Status.SAVING, 
commandCache.getUnsafe(txnId).status());
 
         // since the command is still saving, the loader shouldn't be able to 
acquire a reference
@@ -402,7 +403,7 @@ public class AsyncLoaderTest
         inProgressCFKSaveTest(TIMESTAMPS, 
AccordCommandStore::timestampsForKeyCache, context -> context.timestampsForKey, 
TimestampsForKey::new, (tfk, c) -> new TimestampsForKey(tfk.key(), 
c.executeAt(), c.executeAt().hlc(), c.txnId(), c.executeAt()));
     }
 
-    private <T1, T2 extends AccordSafeState<RoutingKey, T1>, C extends 
AccordStateCache.Instance<RoutingKey, T1, T2>>  void 
inProgressCFKSaveTest(KeyHistory history, Function<AccordCommandStore, C> 
getter, Function<Context, TreeMap<?, ?>> inContext, Function<RoutingKey, T1> 
initialiser, BiFunction<T1, Command, T1> update)
+    private <T1, T2 extends AccordSafeState<RoutingKey, T1>, C extends 
AccordStateCache.Instance<RoutingKey, T1, T2>>  void 
inProgressCFKSaveTest(KeyHistory history, Function<AccordCommandStore, C> 
getter, Function<Context, Map<?, ?>> inContext, Function<RoutingKey, T1> 
initialiser, BiFunction<T1, Command, T1> update)
     {
         AtomicLong clock = new AtomicLong(0);
         ManualExecutor executor = new ManualExecutor();
@@ -410,7 +411,7 @@ public class AsyncLoaderTest
         createAccordCommandStore(clock::incrementAndGet, "ks", "tbl", 
executor, executor);
 
         C cache = getter.apply(commandStore);
-        cache.unsafeSetSaveFunction((before, after) -> () -> { throw new 
AssertionError("nodes expected to be saved manually"); });
+        cache.unsafeSetSaveFunction((after) -> () -> { throw new 
AssertionError("nodes expected to be saved manually"); });
 
         TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
         PartialTxn txn = createPartialTxn(0);
@@ -424,7 +425,7 @@ public class AsyncLoaderTest
         cache.release(safe);
 
         Assert.assertEquals(AccordCachingState.Status.MODIFIED, 
cache.getUnsafe(key).status());
-        cache.getUnsafe(key).save(executor, (before, after) -> () -> {});
+        cache.getUnsafe(key).save(executor, (after) -> () -> {});
         Assert.assertEquals(AccordCachingState.Status.SAVING, 
cache.getUnsafe(key).status());
 
         // since the command is still saving, the loader shouldn't be able to 
acquire a reference


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to