This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit a0af69846c4e4234a2b6e389008e075511a6742a Author: Benedict Elliott Smith <[email protected]> AuthorDate: Thu Oct 10 10:19:50 2024 +0100 Halve cache memory consumption by not retaining 'original' to diff; dedup RoutingKey tableId; avoid calculating rejectsFastPath in more cases; delay retry of fetchMajorityDeps; fix SetShardDurable marking shards durable --- modules/accord | 2 +- .../apache/cassandra/journal/EntrySerializer.java | 1 - .../apache/cassandra/journal/InMemoryIndex.java | 1 - src/java/org/apache/cassandra/schema/TableId.java | 6 +++ .../service/accord/AccordCachingState.java | 23 ++++------- .../service/accord/AccordCommandStore.java | 17 ++++----- .../cassandra/service/accord/AccordJournal.java | 1 - .../cassandra/service/accord/AccordKeyspace.java | 44 +++++++++++++++------- .../service/accord/AccordSafeCommandStore.java | 12 +++--- .../cassandra/service/accord/AccordStateCache.java | 10 ++--- .../service/accord/api/AccordRoutingKey.java | 6 +-- .../cassandra/service/accord/api/PartitionKey.java | 4 +- .../service/accord/async/AsyncOperation.java | 22 +++++------ .../compaction/CompactionAccordIteratorsTest.java | 2 +- .../service/accord/AccordStateCacheTest.java | 18 ++++----- .../service/accord/async/AsyncLoaderTest.java | 15 ++++---- 16 files changed, 96 insertions(+), 88 deletions(-) diff --git a/modules/accord b/modules/accord index 841e139bc8..8bce46bee7 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 841e139bc8a974ac674ce8eae847bd52255ca544 +Subproject commit 8bce46bee7497262a8c16c6b779c08558968604f diff --git a/src/java/org/apache/cassandra/journal/EntrySerializer.java b/src/java/org/apache/cassandra/journal/EntrySerializer.java index 48ef59a6e4..2a707e7d73 100644 --- a/src/java/org/apache/cassandra/journal/EntrySerializer.java +++ b/src/java/org/apache/cassandra/journal/EntrySerializer.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import java.util.Set; import java.util.zip.CRC32; -import accord.utils.Invariants; import org.agrona.collections.IntHashSet; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputBuffer; diff --git a/src/java/org/apache/cassandra/journal/InMemoryIndex.java b/src/java/org/apache/cassandra/journal/InMemoryIndex.java index 8141f338a9..77fd7352ee 100644 --- a/src/java/org/apache/cassandra/journal/InMemoryIndex.java +++ b/src/java/org/apache/cassandra/journal/InMemoryIndex.java @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; -import accord.utils.Invariants; import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileOutputStreamPlus; import org.apache.cassandra.journal.StaticSegment.SequentialReader; diff --git a/src/java/org/apache/cassandra/schema/TableId.java b/src/java/org/apache/cassandra/schema/TableId.java index 302d7db6bf..03fd3dc490 100644 --- a/src/java/org/apache/cassandra/schema/TableId.java +++ b/src/java/org/apache/cassandra/schema/TableId.java @@ -200,6 +200,12 @@ public class TableId implements Comparable<TableId> return new TableId(new UUID(accessor.getLong(src, offset), accessor.getLong(src, offset + TypeSizes.LONG_SIZE))); } + public TableId tryIntern() + { + TableMetadata metadata = Schema.instance.getTableMetadata(this); + return metadata == null ? this : metadata.id; + } + @Override public int compareTo(TableId o) { diff --git a/src/java/org/apache/cassandra/service/accord/AccordCachingState.java b/src/java/org/apache/cassandra/service/accord/AccordCachingState.java index 50a48be1fb..b8bb61e00c 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCachingState.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCachingState.java @@ -18,7 +18,6 @@ package org.apache.cassandra.service.accord; import java.util.concurrent.Callable; -import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.ToLongFunction; @@ -208,10 +207,10 @@ public class AccordCachingState<K, V> extends IntrusiveLinkedListNode * has completed, the state save will have either completed or failed. */ @VisibleForTesting - public void save(ExecutorPlus executor, BiFunction<?, ?, Runnable> saveFunction) + public void save(ExecutorPlus executor, Function<?, Runnable> saveFunction) { @SuppressWarnings("unchecked") - State<K, V> savingOrLoaded = state.save((BiFunction<V, V, Runnable>) saveFunction); + State<K, V> savingOrLoaded = state.save((Function<V, Runnable>) saveFunction); if (savingOrLoaded.status() == SAVING) executor.submit(savingOrLoaded.saving()); state(savingOrLoaded); @@ -319,7 +318,7 @@ public class AccordCachingState<K, V> extends IntrusiveLinkedListNode throw illegalState(this, "set(value)"); } - default State<K, V> save(BiFunction<V, V, Runnable> saveFunction) + default State<K, V> save(Function<V, Runnable> saveFunction) { throw illegalState(this, "save(saveFunction)"); } @@ -447,7 +446,7 @@ public class AccordCachingState<K, V> extends IntrusiveLinkedListNode @Override public State<K, V> set(V value) { - return value == original ? this : new Modified<>(original, value); + return value == original ? this : new Modified<>(value); } @Override @@ -499,12 +498,10 @@ public class AccordCachingState<K, V> extends IntrusiveLinkedListNode static class Modified<K, V> implements State<K, V> { - final V original; V current; - Modified(V original, V current) + Modified(V current) { - this.original = original; this.current = current; } @@ -523,17 +520,14 @@ public class AccordCachingState<K, V> extends IntrusiveLinkedListNode @Override public State<K, V> set(V value) { - if (value == original) // change reverted - return new Loaded<>(original); - current = value; return this; } @Override - public State<K, V> save(BiFunction<V, V, Runnable> saveFunction) + public State<K, V> save(Function<V, Runnable> saveFunction) { - Runnable runnable = saveFunction.apply(original, current); + Runnable runnable = saveFunction.apply(current); if (null == runnable) // null mutation -> null Runnable -> no change on disk return new Loaded<>(current); else @@ -543,8 +537,7 @@ public class AccordCachingState<K, V> extends IntrusiveLinkedListNode @Override public long estimateOnHeapSize(ToLongFunction<V> estimateFunction) { - return (null == original ? 0 : estimateFunction.applyAsLong(original)) - + (null == current ? 0 : estimateFunction.applyAsLong(current)); + return (null == current ? 0 : estimateFunction.applyAsLong(current)); } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index 718c669f47..196d5d4913 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -79,11 +79,8 @@ import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.Promise; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; -import static accord.primitives.SaveStatus.Applying; import static accord.primitives.Status.Committed; import static accord.primitives.Status.Invalidated; -import static accord.primitives.Status.PreApplied; -import static accord.primitives.Status.Stable; import static accord.primitives.Status.Truncated; import static accord.utils.Invariants.checkState; @@ -290,12 +287,12 @@ public class AccordCommandStore extends CommandStore @Nullable @VisibleForTesting - public Runnable appendToKeyspace(Command before, Command after) + public Runnable appendToKeyspace(Command after) { if (after.txnId().is(Routable.Domain.Key)) return null; - Mutation mutation = AccordKeyspace.getCommandMutation(this.id, before, after, nextSystemTimestampMicros()); + Mutation mutation = AccordKeyspace.getCommandMutation(this.id, after, nextSystemTimestampMicros()); // TODO (required): make sure we test recovering when this has failed to be persisted if (null != mutation) @@ -360,14 +357,14 @@ public class AccordCommandStore extends CommandStore } @Nullable - private Runnable saveTimestampsForKey(TimestampsForKey before, TimestampsForKey after) + private Runnable saveTimestampsForKey(TimestampsForKey after) { - Mutation mutation = AccordKeyspace.getTimestampsForKeyMutation(id, before, after, nextSystemTimestampMicros()); + Mutation mutation = AccordKeyspace.getTimestampsForKeyMutation(id, after, nextSystemTimestampMicros()); return null != mutation ? mutation::applyUnsafe : null; } @Nullable - private Runnable saveCommandsForKey(CommandsForKey before, CommandsForKey after) + private Runnable saveCommandsForKey(CommandsForKey after) { Mutation mutation = AccordKeyspace.getCommandsForKeyMutation(id, after, nextSystemTimestampMicros()); return null != mutation ? mutation::applyUnsafe : null; @@ -447,8 +444,8 @@ public class AccordCommandStore extends CommandStore public AccordSafeCommandStore beginOperation(PreLoadContext preLoadContext, Map<TxnId, AccordSafeCommand> commands, - NavigableMap<RoutingKey, AccordSafeTimestampsForKey> timestampsForKeys, - NavigableMap<RoutingKey, AccordSafeCommandsForKey> commandsForKeys, + Map<RoutingKey, AccordSafeTimestampsForKey> timestampsForKeys, + Map<RoutingKey, AccordSafeCommandsForKey> commandsForKeys, @Nullable AccordSafeCommandsForRanges commandsForRanges) { checkState(current == null); diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index c0bf9c5a6b..26b868e5d1 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -20,7 +20,6 @@ package org.apache.cassandra.service.accord; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.NavigableMap; import java.util.Set; diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index 725deb05fc..b3b04a5172 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -805,6 +805,18 @@ public class AccordKeyspace } } + private static <C, V> void addCell(ColumnMetadata column, Function<C, V> get, SerializeFunction<V> serialize, Row.Builder builder, long timestampMicros, int nowInSeconds, C current) throws IOException + { + V newValue = get.apply(current); + if (newValue == null) builder.addCell(tombstone(column, timestampMicros, nowInSeconds)); + else builder.addCell(live(column, timestampMicros, serialize.apply(newValue))); + } + + private static <C extends Command, V> void addCell(ColumnMetadata column, Function<C, V> get, LocalVersionedSerializer<V> serializer, Row.Builder builder, long timestampMicros, int nowInSeconds, C command) throws IOException + { + addCell(column, get, v -> serializeOrNull(v, serializer), builder, timestampMicros, nowInSeconds, command); + } + private static <C extends Command, V> void addCellIfModified(ColumnMetadata column, Function<C, V> get, LocalVersionedSerializer<V> serializer, Row.Builder builder, long timestampMicros, int nowInSeconds, C original, C command) throws IOException { addCellIfModified(column, get, v -> serializeOrNull(v, serializer), builder, timestampMicros, nowInSeconds, original, command); @@ -817,29 +829,34 @@ public class AccordKeyspace addCellIfModified(column, get, v -> accessor.valueOf(v.ordinal()), builder, timestampMicros, nowInSeconds, original, command); } + private static <C extends Command, V extends Enum<V>> void addEnumCell(ColumnMetadata column, Function<C, V> get, Row.Builder builder, long timestampMicros, int nowInSeconds, C command) throws IOException + { + // TODO: convert to byte arrays + ValueAccessor<ByteBuffer> accessor = ByteBufferAccessor.instance; + addCell(column, get, v -> accessor.valueOf(v.ordinal()), builder, timestampMicros, nowInSeconds, command); + } + public static Mutation getCommandMutation(AccordCommandStore commandStore, AccordSafeCommand liveCommand, long timestampMicros) { - return getCommandMutation(commandStore.id(), liveCommand.original(), liveCommand.current(), timestampMicros); + return getCommandMutation(commandStore.id(), liveCommand.current(), timestampMicros); } - public static Mutation getCommandMutation(int storeId, Command original, Command command, long timestampMicros) + public static Mutation getCommandMutation(int storeId, Command command, long timestampMicros) { if (command.saveStatus() == SaveStatus.Uninitialised) return null; try { - Invariants.checkArgument(original != command); - Row.Builder builder = BTreeRow.unsortedBuilder(); builder.newRow(Clustering.EMPTY); int nowInSeconds = (int) TimeUnit.MICROSECONDS.toSeconds(timestampMicros); builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(timestampMicros, nowInSeconds)); - addEnumCellIfModified(CommandsColumns.durability, Command::durability, builder, timestampMicros, nowInSeconds, original, command); - addCellIfModified(CommandsColumns.participants, Command::participants, LocalVersionedSerializers.participants, builder, timestampMicros, nowInSeconds, original, command); - addEnumCellIfModified(CommandsColumns.status, Command::saveStatus, builder, timestampMicros, nowInSeconds, original, command); - addCellIfModified(CommandsColumns.execute_at, Command::executeAt, AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds, original, command); + addEnumCell(CommandsColumns.durability, Command::durability, builder, timestampMicros, nowInSeconds, command); + addCell(CommandsColumns.participants, Command::participants, LocalVersionedSerializers.participants, builder, timestampMicros, nowInSeconds, command); + addEnumCell(CommandsColumns.status, Command::saveStatus, builder, timestampMicros, nowInSeconds, command); + addCell(CommandsColumns.execute_at, Command::executeAt, AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds, command); Row row = builder.build(); if (row.columnCount() == 0) @@ -1080,11 +1097,10 @@ public class AccordKeyspace return (TokenKey) AccordRoutingKeyByteSource.Serializer.fromComparableBytes(ByteBufferAccessor.instance, tokenBytes, tableId, currentVersion, null); } - public static Mutation getTimestampsForKeyMutation(int storeId, TimestampsForKey original, TimestampsForKey current, long timestampMicros) + public static Mutation getTimestampsForKeyMutation(int storeId, TimestampsForKey current, long timestampMicros) { try { - Invariants.checkArgument(original != current); // TODO: convert to byte arrays ValueAccessor<ByteBuffer> accessor = ByteBufferAccessor.instance; @@ -1093,9 +1109,9 @@ public class AccordKeyspace int nowInSeconds = (int) TimeUnit.MICROSECONDS.toSeconds(timestampMicros); LivenessInfo livenessInfo = LivenessInfo.create(timestampMicros, nowInSeconds); builder.addPrimaryKeyLivenessInfo(livenessInfo); - addCellIfModified(TimestampsForKeyColumns.last_executed_timestamp, TimestampsForKey::lastExecutedTimestamp, AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds, original, current); - addCellIfModified(TimestampsForKeyColumns.last_executed_micros, TimestampsForKey::rawLastExecutedHlc, accessor::valueOf, builder, timestampMicros, nowInSeconds, original, current); - addCellIfModified(TimestampsForKeyColumns.last_write_timestamp, TimestampsForKey::lastWriteTimestamp, AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds, original, current); + addCell(TimestampsForKeyColumns.last_executed_timestamp, TimestampsForKey::lastExecutedTimestamp, AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds, current); + addCell(TimestampsForKeyColumns.last_executed_micros, TimestampsForKey::rawLastExecutedHlc, accessor::valueOf, builder, timestampMicros, nowInSeconds, current); + addCell(TimestampsForKeyColumns.last_write_timestamp, TimestampsForKey::lastWriteTimestamp, AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds, current); Row row = builder.build(); if (row.columnCount() == 0) @@ -1113,7 +1129,7 @@ public class AccordKeyspace public static Mutation getTimestampsForKeyMutation(AccordCommandStore commandStore, AccordSafeTimestampsForKey liveTimestamps, long timestampMicros) { - return getTimestampsForKeyMutation(commandStore.id(), liveTimestamps.original(), liveTimestamps.current(), timestampMicros); + return getTimestampsForKeyMutation(commandStore.id(), liveTimestamps.current(), timestampMicros); } public static UntypedResultSet loadTimestampsForKeyRow(CommandStore commandStore, TokenKey key) diff --git a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java index f97f8336b7..4025049d39 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java @@ -53,8 +53,8 @@ import accord.utils.Invariants; public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeCommand, AccordSafeTimestampsForKey, AccordSafeCommandsForKey> { private final Map<TxnId, AccordSafeCommand> commands; - private final NavigableMap<RoutingKey, AccordSafeCommandsForKey> commandsForKeys; - private final NavigableMap<RoutingKey, AccordSafeTimestampsForKey> timestampsForKeys; + private final Map<RoutingKey, AccordSafeCommandsForKey> commandsForKeys; + private final Map<RoutingKey, AccordSafeTimestampsForKey> timestampsForKeys; private final @Nullable AccordSafeCommandsForRanges commandsForRanges; private final AccordCommandStore commandStore; private RangesForEpoch ranges; @@ -62,8 +62,8 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC private AccordSafeCommandStore(PreLoadContext context, Map<TxnId, AccordSafeCommand> commands, - NavigableMap<RoutingKey, AccordSafeTimestampsForKey> timestampsForKey, - NavigableMap<RoutingKey, AccordSafeCommandsForKey> commandsForKey, + Map<RoutingKey, AccordSafeTimestampsForKey> timestampsForKey, + Map<RoutingKey, AccordSafeCommandsForKey> commandsForKey, @Nullable AccordSafeCommandsForRanges commandsForRanges, AccordCommandStore commandStore) { @@ -80,8 +80,8 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC public static AccordSafeCommandStore create(PreLoadContext preLoadContext, Map<TxnId, AccordSafeCommand> commands, - NavigableMap<RoutingKey, AccordSafeTimestampsForKey> timestampsForKey, - NavigableMap<RoutingKey, AccordSafeCommandsForKey> commandsForKey, + Map<RoutingKey, AccordSafeTimestampsForKey> timestampsForKey, + Map<RoutingKey, AccordSafeCommandsForKey> commandsForKey, @Nullable AccordSafeCommandsForRanges commandsForRanges, AccordCommandStore commandStore) { diff --git a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java index 019b888ded..ccbc90fdb2 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java +++ b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java @@ -240,7 +240,7 @@ public class AccordStateCache extends IntrusiveLinkedList<AccordCachingState<?,? Class<S> valClass, Function<AccordCachingState<K, V>, S> safeRefFactory, Function<K, V> loadFunction, - BiFunction<V, V, Runnable> saveFunction, + Function<V, Runnable> saveFunction, BiFunction<K, V, Boolean> validateFunction, ToLongFunction<V> heapEstimator, AccordCachingState.Factory<K, V> nodeFactory) @@ -262,7 +262,7 @@ public class AccordStateCache extends IntrusiveLinkedList<AccordCachingState<?,? Class<S> valClass, Function<AccordCachingState<K, V>, S> safeRefFactory, Function<K, V> loadFunction, - BiFunction<V, V, Runnable> saveFunction, + Function<V, Runnable> saveFunction, BiFunction<K, V, Boolean> validateFunction, ToLongFunction<V> heapEstimator) { @@ -287,7 +287,7 @@ public class AccordStateCache extends IntrusiveLinkedList<AccordCachingState<?,? private final Class<K> keyClass; private final Function<AccordCachingState<K, V>, S> safeRefFactory; private Function<K, V> loadFunction; - private BiFunction<V, V, Runnable> saveFunction; + private Function<V, Runnable> saveFunction; private final BiFunction<K, V, Boolean> validateFunction; private final ToLongFunction<V> heapEstimator; private long bytesCached; @@ -303,7 +303,7 @@ public class AccordStateCache extends IntrusiveLinkedList<AccordCachingState<?,? int index, Class<K> keyClass, Function<AccordCachingState<K, V>, S> safeRefFactory, Function<K, V> loadFunction, - BiFunction<V, V, Runnable> saveFunction, + Function<V, Runnable> saveFunction, BiFunction<K, V, Boolean> validateFunction, ToLongFunction<V> heapEstimator, AccordCachingState.Factory<K, V> nodeFactory) @@ -643,7 +643,7 @@ public class AccordStateCache extends IntrusiveLinkedList<AccordCachingState<?,? } @VisibleForTesting - public void unsafeSetSaveFunction(BiFunction<V, V, Runnable> saveFunction) + public void unsafeSetSaveFunction(Function<V, Runnable> saveFunction) { this.saveFunction = saveFunction; } diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java index 6d8d2b8184..deec2b21ab 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java @@ -185,7 +185,7 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout @Override public SentinelKey deserialize(DataInputPlus in, int version) throws IOException { - TableId table = TableId.deserialize(in); + TableId table = TableId.deserialize(in).tryIntern(); boolean isMin = in.readBoolean(); return new SentinelKey(table, isMin); } @@ -287,14 +287,14 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout @Override public TokenKey deserialize(DataInputPlus in, int version) throws IOException { - TableId table = TableId.deserialize(in); + TableId table = TableId.deserialize(in).tryIntern(); Token token = Token.compactSerializer.deserialize(in, getPartitioner(), version); return new TokenKey(table, token); } public TokenKey fromBytes(ByteBuffer bytes, IPartitioner partitioner) { - TableId tableId = TableId.deserialize(bytes, ByteBufferAccessor.instance, 0); + TableId tableId = TableId.deserialize(bytes, ByteBufferAccessor.instance, 0).tryIntern(); bytes.position(tableId.serializedSize()); Token token = Token.compactSerializer.deserialize(bytes, partitioner); return new TokenKey(tableId, token); diff --git a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java index fc78fe6692..aaa1264ea0 100644 --- a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java +++ b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java @@ -154,7 +154,7 @@ public final class PartitionKey extends AccordRoutableKey implements Key @Override public PartitionKey deserialize(DataInputPlus in, int version) throws IOException { - TableId tableId = TableId.deserialize(in); + TableId tableId = TableId.deserialize(in).tryIntern(); IPartitioner partitioner = Schema.instance.getExistingTablePartitioner(tableId); DecoratedKey key = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in)); return new PartitionKey(tableId, key); @@ -162,7 +162,7 @@ public final class PartitionKey extends AccordRoutableKey implements Key public <V> PartitionKey deserialize(V src, ValueAccessor<V> accessor, int offset) throws IOException { - TableId tableId = TableId.deserialize(src, accessor, offset); + TableId tableId = TableId.deserialize(src, accessor, offset).tryIntern(); offset += tableId.serializedSize(); TableMetadata metadata = Schema.instance.getTableMetadata(tableId); int numBytes = accessor.getShort(src, offset); diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java index cc5bee6d32..463350adf8 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java @@ -18,9 +18,7 @@ package org.apache.cassandra.service.accord.async; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.TreeMap; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -39,13 +37,13 @@ import accord.primitives.TxnId; import accord.primitives.Unseekables; import accord.utils.Invariants; import accord.utils.async.AsyncChains; +import org.agrona.collections.Object2ObjectHashMap; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.service.accord.AccordCommandStore; import org.apache.cassandra.service.accord.AccordSafeCommand; import org.apache.cassandra.service.accord.AccordSafeCommandStore; import org.apache.cassandra.service.accord.AccordSafeCommandsForKey; import org.apache.cassandra.service.accord.AccordSafeCommandsForRanges; -import org.apache.cassandra.service.accord.AccordSafeState; import org.apache.cassandra.service.accord.AccordSafeTimestampsForKey; import org.apache.cassandra.service.accord.SavedCommand; import org.apache.cassandra.utils.concurrent.Condition; @@ -71,28 +69,28 @@ public abstract class AsyncOperation<R> extends AsyncChains.Head<R> implements R static class Context { - final HashMap<TxnId, AccordSafeCommand> commands = new HashMap<>(); - final TreeMap<RoutingKey, AccordSafeTimestampsForKey> timestampsForKey = new TreeMap<>(); - final TreeMap<RoutingKey, AccordSafeCommandsForKey> commandsForKey = new TreeMap<>(); + final Object2ObjectHashMap<TxnId, AccordSafeCommand> commands = new Object2ObjectHashMap<>(); + final Object2ObjectHashMap<RoutingKey, AccordSafeTimestampsForKey> timestampsForKey = new Object2ObjectHashMap<>(); + final Object2ObjectHashMap<RoutingKey, AccordSafeCommandsForKey> commandsForKey = new Object2ObjectHashMap<>(); @Nullable AccordSafeCommandsForRanges commandsForRanges = null; void releaseResources(AccordCommandStore commandStore) { // TODO (expected): we should destructively iterate to avoid invoking second time in fail; or else read and set to null - commands.values().forEach(commandStore.commandCache()::release); + commands.forEach((k, v) -> commandStore.commandCache().release(v)); commands.clear(); - timestampsForKey.values().forEach(commandStore.timestampsForKeyCache()::release); + timestampsForKey.forEach((k, v) -> commandStore.timestampsForKeyCache().release(v)); timestampsForKey.clear(); - commandsForKey.values().forEach(commandStore.commandsForKeyCache()::release); + commandsForKey.forEach((k, v) -> commandStore.commandsForKeyCache().release(v)); commandsForKey.clear(); } void revertChanges() { - commands.values().forEach(AccordSafeState::revert); - timestampsForKey.values().forEach(AccordSafeState::revert); - commandsForKey.values().forEach(AccordSafeState::revert); + commands.forEach((k, v) -> v.revert()); + timestampsForKey.forEach((k, v) -> v.revert()); + commandsForKey.forEach((k, v) -> v.revert()); if (commandsForRanges != null) commandsForRanges.revert(); } diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java index a329be5864..ca59f202b7 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java @@ -522,7 +522,7 @@ public class CompactionAccordIteratorsTest private static BiConsumer<Command, Command> appendDiffToKeyspace(AccordCommandStore commandStore) { return (before, after) -> { - AccordKeyspace.getCommandMutation(commandStore.id(), before, after, commandStore.nextSystemTimestampMicros()).applyUnsafe(); + AccordKeyspace.getCommandMutation(commandStore.id(), after, commandStore.nextSystemTimestampMicros()).applyUnsafe(); }; } diff --git a/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java b/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java index d82f143054..6f10d977b2 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java @@ -183,7 +183,7 @@ public class AccordStateCacheTest ManualExecutor executor = new ManualExecutor(); AccordStateCache cache = new AccordStateCache(executor, executor, 500, cacheMetrics); AccordStateCache.Instance<String, String, SafeString> instance = - cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (original, current) -> null, (k, v) -> true, String::length); + cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (current) -> null, (k, v) -> true, String::length); assertCacheState(cache, 0, 0, 0); SafeString safeString1 = instance.acquire("1"); @@ -215,9 +215,9 @@ public class AccordStateCacheTest ManualExecutor executor = new ManualExecutor(); AccordStateCache cache = new AccordStateCache(executor, executor, 500, cacheMetrics); AccordStateCache.Instance<String, String, SafeString> stringInstance = - cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (original, current) -> null, (k, v) -> true,String::length); + cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (current) -> null, (k, v) -> true,String::length); AccordStateCache.Instance<Integer, Integer, SafeInt> intInstance = - cache.instance(Integer.class, SafeInt.class, SafeInt::new, key -> key, (original, current) -> null, (k, v) -> true,ignored -> Integer.BYTES); + cache.instance(Integer.class, SafeInt.class, SafeInt::new, key -> key, (current) -> null, (k, v) -> true,ignored -> Integer.BYTES); assertCacheState(cache, 0, 0, 0); SafeString safeString1 = stringInstance.acquire("1"); @@ -255,7 +255,7 @@ public class AccordStateCacheTest ManualExecutor executor = new ManualExecutor(); AccordStateCache cache = new AccordStateCache(executor, executor, DEFAULT_NODE_SIZE * 5, cacheMetrics); AccordStateCache.Instance<String, String, SafeString> instance = - cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (original, current) -> null, (k, v) -> true, String::length); + cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (current) -> null, (k, v) -> true, String::length); assertCacheState(cache, 0, 0, 0); SafeString[] items = new SafeString[3]; @@ -295,7 +295,7 @@ public class AccordStateCacheTest ManualExecutor executor = new ManualExecutor(); AccordStateCache cache = new AccordStateCache(executor, executor, nodeSize(1) * 5, cacheMetrics); AccordStateCache.Instance<String, String, SafeString> instance = - cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (original, current) -> null, (k, v) -> true, String::length); + cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (current) -> null, (k, v) -> true, String::length); assertCacheState(cache, 0, 0, 0); SafeString[] items = new SafeString[5]; @@ -341,7 +341,7 @@ public class AccordStateCacheTest ManualExecutor executor = new ManualExecutor(); AccordStateCache cache = new AccordStateCache(executor, executor, nodeSize(1) * 4, cacheMetrics); AccordStateCache.Instance<String, String, SafeString> instance = - cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (original, current) -> null, (k, v) -> true, String::length); + cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (current) -> null, (k, v) -> true, String::length); assertCacheState(cache, 0, 0, 0); SafeString[] items = new SafeString[5]; @@ -380,7 +380,7 @@ public class AccordStateCacheTest ManualExecutor executor = new ManualExecutor(); AccordStateCache cache = new AccordStateCache(executor, executor, DEFAULT_NODE_SIZE * 4, cacheMetrics); AccordStateCache.Instance<String, String, SafeString> instance = - cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (original, current) -> null, (k, v) -> true, String::length); + cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (current) -> null, (k, v) -> true, String::length); assertCacheState(cache, 0, 0, 0); SafeString safeString1 = instance.acquire("0"); @@ -411,7 +411,7 @@ public class AccordStateCacheTest ManualExecutor executor = new ManualExecutor(); AccordStateCache cache = new AccordStateCache(executor, executor, nodeSize(1) * 3 + nodeSize(3), cacheMetrics); AccordStateCache.Instance<String, String, SafeString> instance = - cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (original, current) -> null, (k, v) -> true, String::length); + cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (current) -> null, (k, v) -> true, String::length); assertCacheState(cache, 0, 0, 0); SafeString item = instance.acquire(Integer.toString(0)); @@ -450,7 +450,7 @@ public class AccordStateCacheTest ManualExecutor executor = new ManualExecutor(); AccordStateCache cache = new AccordStateCache(executor, executor, 500, cacheMetrics); AccordStateCache.Instance<String, String, SafeString> instance = - cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (original, current) -> null, (k, v) -> true, String::length); + cache.instance(String.class, SafeString.class, SafeString::new, key -> key, (current) -> null, (k, v) -> true, String::length); assertCacheState(cache, 0, 0, 0); SafeString safeString = instance.acquire("1"); diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java index 680777bde8..5a09a23134 100644 --- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java +++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java @@ -18,6 +18,7 @@ package org.apache.cassandra.service.accord.async; +import java.util.Map; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -155,7 +156,7 @@ public class AsyncLoaderTest timestamps.preExecute(); timestamps.initialize(); - AccordKeyspace.getTimestampsForKeyMutation(commandStore.id(), null, timestamps.current(), commandStore.nextSystemTimestampMicros()).apply(); + AccordKeyspace.getTimestampsForKeyMutation(commandStore.id(), timestamps.current(), commandStore.nextSystemTimestampMicros()).apply(); // resources are on disk only, so the loader should suspend... AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), RoutingKeys.of(key), TIMESTAMPS); @@ -203,7 +204,7 @@ public class AsyncLoaderTest testLoad(executor, safeCommand, notDefined(txnId, txn)); commandCache.release(safeCommand); - AccordKeyspace.getTimestampsForKeyMutation(commandStore.id(), null, new TimestampsForKey(key), commandStore.nextSystemTimestampMicros()).apply(); + AccordKeyspace.getTimestampsForKeyMutation(commandStore.id(), new TimestampsForKey(key), commandStore.nextSystemTimestampMicros()).apply(); // resources are on disk only, so the loader should suspend... AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), RoutingKeys.of(key), TIMESTAMPS); @@ -353,7 +354,7 @@ public class AsyncLoaderTest // acquire / release commandCache.unsafeSetLoadFunction(id -> notDefined(id, txn)); - commandCache.unsafeSetSaveFunction((before, after) -> () -> { throw new AssertionError("nodes expected to be saved manually"); }); + commandCache.unsafeSetSaveFunction((after) -> () -> { throw new AssertionError("nodes expected to be saved manually"); }); AccordSafeCommand safeCommand = commandCache.acquire(txnId); testLoad(executor, safeCommand, notDefined(txnId, txn)); @@ -361,7 +362,7 @@ public class AsyncLoaderTest commandCache.release(safeCommand); Assert.assertEquals(AccordCachingState.Status.MODIFIED, commandCache.getUnsafe(txnId).status()); - commandCache.getUnsafe(txnId).save(executor, (before, after) -> () -> {}); + commandCache.getUnsafe(txnId).save(executor, (after) -> () -> {}); Assert.assertEquals(AccordCachingState.Status.SAVING, commandCache.getUnsafe(txnId).status()); // since the command is still saving, the loader shouldn't be able to acquire a reference @@ -402,7 +403,7 @@ public class AsyncLoaderTest inProgressCFKSaveTest(TIMESTAMPS, AccordCommandStore::timestampsForKeyCache, context -> context.timestampsForKey, TimestampsForKey::new, (tfk, c) -> new TimestampsForKey(tfk.key(), c.executeAt(), c.executeAt().hlc(), c.txnId(), c.executeAt())); } - private <T1, T2 extends AccordSafeState<RoutingKey, T1>, C extends AccordStateCache.Instance<RoutingKey, T1, T2>> void inProgressCFKSaveTest(KeyHistory history, Function<AccordCommandStore, C> getter, Function<Context, TreeMap<?, ?>> inContext, Function<RoutingKey, T1> initialiser, BiFunction<T1, Command, T1> update) + private <T1, T2 extends AccordSafeState<RoutingKey, T1>, C extends AccordStateCache.Instance<RoutingKey, T1, T2>> void inProgressCFKSaveTest(KeyHistory history, Function<AccordCommandStore, C> getter, Function<Context, Map<?, ?>> inContext, Function<RoutingKey, T1> initialiser, BiFunction<T1, Command, T1> update) { AtomicLong clock = new AtomicLong(0); ManualExecutor executor = new ManualExecutor(); @@ -410,7 +411,7 @@ public class AsyncLoaderTest createAccordCommandStore(clock::incrementAndGet, "ks", "tbl", executor, executor); C cache = getter.apply(commandStore); - cache.unsafeSetSaveFunction((before, after) -> () -> { throw new AssertionError("nodes expected to be saved manually"); }); + cache.unsafeSetSaveFunction((after) -> () -> { throw new AssertionError("nodes expected to be saved manually"); }); TxnId txnId = txnId(1, clock.incrementAndGet(), 1); PartialTxn txn = createPartialTxn(0); @@ -424,7 +425,7 @@ public class AsyncLoaderTest cache.release(safe); Assert.assertEquals(AccordCachingState.Status.MODIFIED, cache.getUnsafe(key).status()); - cache.getUnsafe(key).save(executor, (before, after) -> () -> {}); + cache.getUnsafe(key).save(executor, (after) -> () -> {}); Assert.assertEquals(AccordCachingState.Status.SAVING, cache.getUnsafe(key).status()); // since the command is still saving, the loader shouldn't be able to acquire a reference --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
