This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 868bbfdf1c Halve cache memory consumption by not retaining 'original'
to diff; dedup RoutingKey tableId; avoid calculating rejectsFastPath in more
cases; delay retry of fetchMajorityDeps
868bbfdf1c is described below
commit 868bbfdf1c43cd23c05252fd5143991103720533
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Oct 10 10:19:50 2024 +0100
Halve cache memory consumption by not retaining 'original' to diff; dedup
RoutingKey tableId; avoid calculating rejectsFastPath in more cases; delay
retry of fetchMajorityDeps
---
modules/accord | 2 +-
src/java/org/apache/cassandra/schema/TableId.java | 6 +++
.../service/accord/AccordCachingState.java | 24 +++++-------
.../service/accord/AccordCommandStore.java | 17 ++++-----
.../cassandra/service/accord/AccordKeyspace.java | 44 +++++++++++++++-------
.../service/accord/AccordSafeCommandStore.java | 12 +++---
.../cassandra/service/accord/AccordStateCache.java | 10 ++---
.../service/accord/api/AccordRoutingKey.java | 6 +--
.../cassandra/service/accord/api/PartitionKey.java | 4 +-
.../service/accord/async/AsyncOperation.java | 22 +++++------
.../compaction/CompactionAccordIteratorsTest.java | 2 +-
.../service/accord/AccordStateCacheTest.java | 18 ++++-----
.../service/accord/async/AsyncLoaderTest.java | 15 ++++----
13 files changed, 98 insertions(+), 84 deletions(-)
diff --git a/modules/accord b/modules/accord
index 841e139bc8..9ccc44c7f2 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 841e139bc8a974ac674ce8eae847bd52255ca544
+Subproject commit 9ccc44c7f2f6b3bbbfcf7b4858de5edae2b41e96
diff --git a/src/java/org/apache/cassandra/schema/TableId.java
b/src/java/org/apache/cassandra/schema/TableId.java
index 302d7db6bf..03fd3dc490 100644
--- a/src/java/org/apache/cassandra/schema/TableId.java
+++ b/src/java/org/apache/cassandra/schema/TableId.java
@@ -200,6 +200,12 @@ public class TableId implements Comparable<TableId>
return new TableId(new UUID(accessor.getLong(src, offset),
accessor.getLong(src, offset + TypeSizes.LONG_SIZE)));
}
+ public TableId tryIntern()
+ {
+ TableMetadata metadata = Schema.instance.getTableMetadata(this);
+ return metadata == null ? this : metadata.id;
+ }
+
@Override
public int compareTo(TableId o)
{
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCachingState.java
b/src/java/org/apache/cassandra/service/accord/AccordCachingState.java
index 50a48be1fb..26b7300c57 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCachingState.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCachingState.java
@@ -22,6 +22,8 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.ToLongFunction;
+import javax.annotation.Nullable;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
@@ -208,10 +210,10 @@ public class AccordCachingState<K, V> extends
IntrusiveLinkedListNode
* has completed, the state save will have either completed or failed.
*/
@VisibleForTesting
- public void save(ExecutorPlus executor, BiFunction<?, ?, Runnable>
saveFunction)
+ public void save(ExecutorPlus executor, Function<?, Runnable> saveFunction)
{
@SuppressWarnings("unchecked")
- State<K, V> savingOrLoaded = state.save((BiFunction<V, V, Runnable>)
saveFunction);
+ State<K, V> savingOrLoaded = state.save((Function<V, Runnable>)
saveFunction);
if (savingOrLoaded.status() == SAVING)
executor.submit(savingOrLoaded.saving());
state(savingOrLoaded);
@@ -319,7 +321,7 @@ public class AccordCachingState<K, V> extends
IntrusiveLinkedListNode
throw illegalState(this, "set(value)");
}
- default State<K, V> save(BiFunction<V, V, Runnable> saveFunction)
+ default State<K, V> save(Function<V, Runnable> saveFunction)
{
throw illegalState(this, "save(saveFunction)");
}
@@ -447,7 +449,7 @@ public class AccordCachingState<K, V> extends
IntrusiveLinkedListNode
@Override
public State<K, V> set(V value)
{
- return value == original ? this : new Modified<>(original, value);
+ return value == original ? this : new Modified<>(value);
}
@Override
@@ -499,12 +501,10 @@ public class AccordCachingState<K, V> extends
IntrusiveLinkedListNode
static class Modified<K, V> implements State<K, V>
{
- final V original;
V current;
- Modified(V original, V current)
+ Modified(V current)
{
- this.original = original;
this.current = current;
}
@@ -523,17 +523,14 @@ public class AccordCachingState<K, V> extends
IntrusiveLinkedListNode
@Override
public State<K, V> set(V value)
{
- if (value == original) // change reverted
- return new Loaded<>(original);
-
current = value;
return this;
}
@Override
- public State<K, V> save(BiFunction<V, V, Runnable> saveFunction)
+ public State<K, V> save(Function<V, Runnable> saveFunction)
{
- Runnable runnable = saveFunction.apply(original, current);
+ Runnable runnable = saveFunction.apply(current);
if (null == runnable) // null mutation -> null Runnable -> no
change on disk
return new Loaded<>(current);
else
@@ -543,8 +540,7 @@ public class AccordCachingState<K, V> extends
IntrusiveLinkedListNode
@Override
public long estimateOnHeapSize(ToLongFunction<V> estimateFunction)
{
- return (null == original ? 0 :
estimateFunction.applyAsLong(original))
- + (null == current ? 0 :
estimateFunction.applyAsLong(current));
+ return (null == current ? 0 :
estimateFunction.applyAsLong(current));
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 718c669f47..196d5d4913 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -79,11 +79,8 @@ import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Promise;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
-import static accord.primitives.SaveStatus.Applying;
import static accord.primitives.Status.Committed;
import static accord.primitives.Status.Invalidated;
-import static accord.primitives.Status.PreApplied;
-import static accord.primitives.Status.Stable;
import static accord.primitives.Status.Truncated;
import static accord.utils.Invariants.checkState;
@@ -290,12 +287,12 @@ public class AccordCommandStore extends CommandStore
@Nullable
@VisibleForTesting
- public Runnable appendToKeyspace(Command before, Command after)
+ public Runnable appendToKeyspace(Command after)
{
if (after.txnId().is(Routable.Domain.Key))
return null;
- Mutation mutation = AccordKeyspace.getCommandMutation(this.id, before,
after, nextSystemTimestampMicros());
+ Mutation mutation = AccordKeyspace.getCommandMutation(this.id, after,
nextSystemTimestampMicros());
// TODO (required): make sure we test recovering when this has failed
to be persisted
if (null != mutation)
@@ -360,14 +357,14 @@ public class AccordCommandStore extends CommandStore
}
@Nullable
- private Runnable saveTimestampsForKey(TimestampsForKey before,
TimestampsForKey after)
+ private Runnable saveTimestampsForKey(TimestampsForKey after)
{
- Mutation mutation = AccordKeyspace.getTimestampsForKeyMutation(id,
before, after, nextSystemTimestampMicros());
+ Mutation mutation = AccordKeyspace.getTimestampsForKeyMutation(id,
after, nextSystemTimestampMicros());
return null != mutation ? mutation::applyUnsafe : null;
}
@Nullable
- private Runnable saveCommandsForKey(CommandsForKey before, CommandsForKey
after)
+ private Runnable saveCommandsForKey(CommandsForKey after)
{
Mutation mutation = AccordKeyspace.getCommandsForKeyMutation(id,
after, nextSystemTimestampMicros());
return null != mutation ? mutation::applyUnsafe : null;
@@ -447,8 +444,8 @@ public class AccordCommandStore extends CommandStore
public AccordSafeCommandStore beginOperation(PreLoadContext preLoadContext,
Map<TxnId, AccordSafeCommand>
commands,
- NavigableMap<RoutingKey,
AccordSafeTimestampsForKey> timestampsForKeys,
- NavigableMap<RoutingKey,
AccordSafeCommandsForKey> commandsForKeys,
+ Map<RoutingKey,
AccordSafeTimestampsForKey> timestampsForKeys,
+ Map<RoutingKey,
AccordSafeCommandsForKey> commandsForKeys,
@Nullable
AccordSafeCommandsForRanges commandsForRanges)
{
checkState(current == null);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 725deb05fc..b3b04a5172 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -805,6 +805,18 @@ public class AccordKeyspace
}
}
+ private static <C, V> void addCell(ColumnMetadata column, Function<C, V>
get, SerializeFunction<V> serialize, Row.Builder builder, long timestampMicros,
int nowInSeconds, C current) throws IOException
+ {
+ V newValue = get.apply(current);
+ if (newValue == null) builder.addCell(tombstone(column,
timestampMicros, nowInSeconds));
+ else builder.addCell(live(column, timestampMicros,
serialize.apply(newValue)));
+ }
+
+ private static <C extends Command, V> void addCell(ColumnMetadata column,
Function<C, V> get, LocalVersionedSerializer<V> serializer, Row.Builder
builder, long timestampMicros, int nowInSeconds, C command) throws IOException
+ {
+ addCell(column, get, v -> serializeOrNull(v, serializer), builder,
timestampMicros, nowInSeconds, command);
+ }
+
private static <C extends Command, V> void
addCellIfModified(ColumnMetadata column, Function<C, V> get,
LocalVersionedSerializer<V> serializer, Row.Builder builder, long
timestampMicros, int nowInSeconds, C original, C command) throws IOException
{
addCellIfModified(column, get, v -> serializeOrNull(v, serializer),
builder, timestampMicros, nowInSeconds, original, command);
@@ -817,29 +829,34 @@ public class AccordKeyspace
addCellIfModified(column, get, v -> accessor.valueOf(v.ordinal()),
builder, timestampMicros, nowInSeconds, original, command);
}
+ private static <C extends Command, V extends Enum<V>> void
addEnumCell(ColumnMetadata column, Function<C, V> get, Row.Builder builder,
long timestampMicros, int nowInSeconds, C command) throws IOException
+ {
+ // TODO: convert to byte arrays
+ ValueAccessor<ByteBuffer> accessor = ByteBufferAccessor.instance;
+ addCell(column, get, v -> accessor.valueOf(v.ordinal()), builder,
timestampMicros, nowInSeconds, command);
+ }
+
public static Mutation getCommandMutation(AccordCommandStore commandStore,
AccordSafeCommand liveCommand, long timestampMicros)
{
- return getCommandMutation(commandStore.id(), liveCommand.original(),
liveCommand.current(), timestampMicros);
+ return getCommandMutation(commandStore.id(), liveCommand.current(),
timestampMicros);
}
- public static Mutation getCommandMutation(int storeId, Command original,
Command command, long timestampMicros)
+ public static Mutation getCommandMutation(int storeId, Command command,
long timestampMicros)
{
if (command.saveStatus() == SaveStatus.Uninitialised)
return null;
try
{
- Invariants.checkArgument(original != command);
-
Row.Builder builder = BTreeRow.unsortedBuilder();
builder.newRow(Clustering.EMPTY);
int nowInSeconds = (int)
TimeUnit.MICROSECONDS.toSeconds(timestampMicros);
builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(timestampMicros,
nowInSeconds));
- addEnumCellIfModified(CommandsColumns.durability,
Command::durability, builder, timestampMicros, nowInSeconds, original, command);
- addCellIfModified(CommandsColumns.participants,
Command::participants, LocalVersionedSerializers.participants, builder,
timestampMicros, nowInSeconds, original, command);
- addEnumCellIfModified(CommandsColumns.status, Command::saveStatus,
builder, timestampMicros, nowInSeconds, original, command);
- addCellIfModified(CommandsColumns.execute_at, Command::executeAt,
AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds,
original, command);
+ addEnumCell(CommandsColumns.durability, Command::durability,
builder, timestampMicros, nowInSeconds, command);
+ addCell(CommandsColumns.participants, Command::participants,
LocalVersionedSerializers.participants, builder, timestampMicros, nowInSeconds,
command);
+ addEnumCell(CommandsColumns.status, Command::saveStatus, builder,
timestampMicros, nowInSeconds, command);
+ addCell(CommandsColumns.execute_at, Command::executeAt,
AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds,
command);
Row row = builder.build();
if (row.columnCount() == 0)
@@ -1080,11 +1097,10 @@ public class AccordKeyspace
return (TokenKey)
AccordRoutingKeyByteSource.Serializer.fromComparableBytes(ByteBufferAccessor.instance,
tokenBytes, tableId, currentVersion, null);
}
- public static Mutation getTimestampsForKeyMutation(int storeId,
TimestampsForKey original, TimestampsForKey current, long timestampMicros)
+ public static Mutation getTimestampsForKeyMutation(int storeId,
TimestampsForKey current, long timestampMicros)
{
try
{
- Invariants.checkArgument(original != current);
// TODO: convert to byte arrays
ValueAccessor<ByteBuffer> accessor = ByteBufferAccessor.instance;
@@ -1093,9 +1109,9 @@ public class AccordKeyspace
int nowInSeconds = (int)
TimeUnit.MICROSECONDS.toSeconds(timestampMicros);
LivenessInfo livenessInfo = LivenessInfo.create(timestampMicros,
nowInSeconds);
builder.addPrimaryKeyLivenessInfo(livenessInfo);
- addCellIfModified(TimestampsForKeyColumns.last_executed_timestamp,
TimestampsForKey::lastExecutedTimestamp, AccordKeyspace::serializeTimestamp,
builder, timestampMicros, nowInSeconds, original, current);
- addCellIfModified(TimestampsForKeyColumns.last_executed_micros,
TimestampsForKey::rawLastExecutedHlc, accessor::valueOf, builder,
timestampMicros, nowInSeconds, original, current);
- addCellIfModified(TimestampsForKeyColumns.last_write_timestamp,
TimestampsForKey::lastWriteTimestamp, AccordKeyspace::serializeTimestamp,
builder, timestampMicros, nowInSeconds, original, current);
+ addCell(TimestampsForKeyColumns.last_executed_timestamp,
TimestampsForKey::lastExecutedTimestamp, AccordKeyspace::serializeTimestamp,
builder, timestampMicros, nowInSeconds, current);
+ addCell(TimestampsForKeyColumns.last_executed_micros,
TimestampsForKey::rawLastExecutedHlc, accessor::valueOf, builder,
timestampMicros, nowInSeconds, current);
+ addCell(TimestampsForKeyColumns.last_write_timestamp,
TimestampsForKey::lastWriteTimestamp, AccordKeyspace::serializeTimestamp,
builder, timestampMicros, nowInSeconds, current);
Row row = builder.build();
if (row.columnCount() == 0)
@@ -1113,7 +1129,7 @@ public class AccordKeyspace
public static Mutation getTimestampsForKeyMutation(AccordCommandStore
commandStore, AccordSafeTimestampsForKey liveTimestamps, long timestampMicros)
{
- return getTimestampsForKeyMutation(commandStore.id(),
liveTimestamps.original(), liveTimestamps.current(), timestampMicros);
+ return getTimestampsForKeyMutation(commandStore.id(),
liveTimestamps.current(), timestampMicros);
}
public static UntypedResultSet loadTimestampsForKeyRow(CommandStore
commandStore, TokenKey key)
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index f97f8336b7..4025049d39 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -53,8 +53,8 @@ import accord.utils.Invariants;
public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeCommand, AccordSafeTimestampsForKey,
AccordSafeCommandsForKey>
{
private final Map<TxnId, AccordSafeCommand> commands;
- private final NavigableMap<RoutingKey, AccordSafeCommandsForKey>
commandsForKeys;
- private final NavigableMap<RoutingKey, AccordSafeTimestampsForKey>
timestampsForKeys;
+ private final Map<RoutingKey, AccordSafeCommandsForKey> commandsForKeys;
+ private final Map<RoutingKey, AccordSafeTimestampsForKey>
timestampsForKeys;
private final @Nullable AccordSafeCommandsForRanges commandsForRanges;
private final AccordCommandStore commandStore;
private RangesForEpoch ranges;
@@ -62,8 +62,8 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
private AccordSafeCommandStore(PreLoadContext context,
Map<TxnId, AccordSafeCommand> commands,
- NavigableMap<RoutingKey,
AccordSafeTimestampsForKey> timestampsForKey,
- NavigableMap<RoutingKey,
AccordSafeCommandsForKey> commandsForKey,
+ Map<RoutingKey, AccordSafeTimestampsForKey>
timestampsForKey,
+ Map<RoutingKey, AccordSafeCommandsForKey>
commandsForKey,
@Nullable AccordSafeCommandsForRanges
commandsForRanges,
AccordCommandStore commandStore)
{
@@ -80,8 +80,8 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
public static AccordSafeCommandStore create(PreLoadContext preLoadContext,
Map<TxnId, AccordSafeCommand>
commands,
- NavigableMap<RoutingKey,
AccordSafeTimestampsForKey> timestampsForKey,
- NavigableMap<RoutingKey,
AccordSafeCommandsForKey> commandsForKey,
+ Map<RoutingKey,
AccordSafeTimestampsForKey> timestampsForKey,
+ Map<RoutingKey,
AccordSafeCommandsForKey> commandsForKey,
@Nullable
AccordSafeCommandsForRanges commandsForRanges,
AccordCommandStore
commandStore)
{
diff --git a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
index 019b888ded..ccbc90fdb2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
@@ -240,7 +240,7 @@ public class AccordStateCache extends
IntrusiveLinkedList<AccordCachingState<?,?
Class<S> valClass,
Function<AccordCachingState<K, V>, S> safeRefFactory,
Function<K, V> loadFunction,
- BiFunction<V, V, Runnable> saveFunction,
+ Function<V, Runnable> saveFunction,
BiFunction<K, V, Boolean> validateFunction,
ToLongFunction<V> heapEstimator,
AccordCachingState.Factory<K, V> nodeFactory)
@@ -262,7 +262,7 @@ public class AccordStateCache extends
IntrusiveLinkedList<AccordCachingState<?,?
Class<S> valClass,
Function<AccordCachingState<K, V>, S> safeRefFactory,
Function<K, V> loadFunction,
- BiFunction<V, V, Runnable> saveFunction,
+ Function<V, Runnable> saveFunction,
BiFunction<K, V, Boolean> validateFunction,
ToLongFunction<V> heapEstimator)
{
@@ -287,7 +287,7 @@ public class AccordStateCache extends
IntrusiveLinkedList<AccordCachingState<?,?
private final Class<K> keyClass;
private final Function<AccordCachingState<K, V>, S> safeRefFactory;
private Function<K, V> loadFunction;
- private BiFunction<V, V, Runnable> saveFunction;
+ private Function<V, Runnable> saveFunction;
private final BiFunction<K, V, Boolean> validateFunction;
private final ToLongFunction<V> heapEstimator;
private long bytesCached;
@@ -303,7 +303,7 @@ public class AccordStateCache extends
IntrusiveLinkedList<AccordCachingState<?,?
int index, Class<K> keyClass,
Function<AccordCachingState<K, V>, S> safeRefFactory,
Function<K, V> loadFunction,
- BiFunction<V, V, Runnable> saveFunction,
+ Function<V, Runnable> saveFunction,
BiFunction<K, V, Boolean> validateFunction,
ToLongFunction<V> heapEstimator,
AccordCachingState.Factory<K, V> nodeFactory)
@@ -643,7 +643,7 @@ public class AccordStateCache extends
IntrusiveLinkedList<AccordCachingState<?,?
}
@VisibleForTesting
- public void unsafeSetSaveFunction(BiFunction<V, V, Runnable>
saveFunction)
+ public void unsafeSetSaveFunction(Function<V, Runnable> saveFunction)
{
this.saveFunction = saveFunction;
}
diff --git
a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
index 6d8d2b8184..deec2b21ab 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
@@ -185,7 +185,7 @@ public abstract class AccordRoutingKey extends
AccordRoutableKey implements Rout
@Override
public SentinelKey deserialize(DataInputPlus in, int version)
throws IOException
{
- TableId table = TableId.deserialize(in);
+ TableId table = TableId.deserialize(in).tryIntern();
boolean isMin = in.readBoolean();
return new SentinelKey(table, isMin);
}
@@ -287,14 +287,14 @@ public abstract class AccordRoutingKey extends
AccordRoutableKey implements Rout
@Override
public TokenKey deserialize(DataInputPlus in, int version) throws
IOException
{
- TableId table = TableId.deserialize(in);
+ TableId table = TableId.deserialize(in).tryIntern();
Token token = Token.compactSerializer.deserialize(in,
getPartitioner(), version);
return new TokenKey(table, token);
}
public TokenKey fromBytes(ByteBuffer bytes, IPartitioner
partitioner)
{
- TableId tableId = TableId.deserialize(bytes,
ByteBufferAccessor.instance, 0);
+ TableId tableId = TableId.deserialize(bytes,
ByteBufferAccessor.instance, 0).tryIntern();
bytes.position(tableId.serializedSize());
Token token = Token.compactSerializer.deserialize(bytes,
partitioner);
return new TokenKey(tableId, token);
diff --git a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
index fc78fe6692..aaa1264ea0 100644
--- a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
@@ -154,7 +154,7 @@ public final class PartitionKey extends AccordRoutableKey
implements Key
@Override
public PartitionKey deserialize(DataInputPlus in, int version) throws
IOException
{
- TableId tableId = TableId.deserialize(in);
+ TableId tableId = TableId.deserialize(in).tryIntern();
IPartitioner partitioner =
Schema.instance.getExistingTablePartitioner(tableId);
DecoratedKey key =
partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
return new PartitionKey(tableId, key);
@@ -162,7 +162,7 @@ public final class PartitionKey extends AccordRoutableKey
implements Key
public <V> PartitionKey deserialize(V src, ValueAccessor<V> accessor,
int offset) throws IOException
{
- TableId tableId = TableId.deserialize(src, accessor, offset);
+ TableId tableId = TableId.deserialize(src, accessor,
offset).tryIntern();
offset += tableId.serializedSize();
TableMetadata metadata = Schema.instance.getTableMetadata(tableId);
int numBytes = accessor.getShort(src, offset);
diff --git
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index cc5bee6d32..463350adf8 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -18,9 +18,7 @@
package org.apache.cassandra.service.accord.async;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -39,13 +37,13 @@ import accord.primitives.TxnId;
import accord.primitives.Unseekables;
import accord.utils.Invariants;
import accord.utils.async.AsyncChains;
+import org.agrona.collections.Object2ObjectHashMap;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.service.accord.AccordCommandStore;
import org.apache.cassandra.service.accord.AccordSafeCommand;
import org.apache.cassandra.service.accord.AccordSafeCommandStore;
import org.apache.cassandra.service.accord.AccordSafeCommandsForKey;
import org.apache.cassandra.service.accord.AccordSafeCommandsForRanges;
-import org.apache.cassandra.service.accord.AccordSafeState;
import org.apache.cassandra.service.accord.AccordSafeTimestampsForKey;
import org.apache.cassandra.service.accord.SavedCommand;
import org.apache.cassandra.utils.concurrent.Condition;
@@ -71,28 +69,28 @@ public abstract class AsyncOperation<R> extends
AsyncChains.Head<R> implements R
static class Context
{
- final HashMap<TxnId, AccordSafeCommand> commands = new HashMap<>();
- final TreeMap<RoutingKey, AccordSafeTimestampsForKey> timestampsForKey
= new TreeMap<>();
- final TreeMap<RoutingKey, AccordSafeCommandsForKey> commandsForKey =
new TreeMap<>();
+ final Object2ObjectHashMap<TxnId, AccordSafeCommand> commands = new
Object2ObjectHashMap<>();
+ final Object2ObjectHashMap<RoutingKey, AccordSafeTimestampsForKey>
timestampsForKey = new Object2ObjectHashMap<>();
+ final Object2ObjectHashMap<RoutingKey, AccordSafeCommandsForKey>
commandsForKey = new Object2ObjectHashMap<>();
@Nullable
AccordSafeCommandsForRanges commandsForRanges = null;
void releaseResources(AccordCommandStore commandStore)
{
// TODO (expected): we should destructively iterate to avoid
invoking second time in fail; or else read and set to null
- commands.values().forEach(commandStore.commandCache()::release);
+ commands.forEach((k, v) -> commandStore.commandCache().release(v));
commands.clear();
-
timestampsForKey.values().forEach(commandStore.timestampsForKeyCache()::release);
+ timestampsForKey.forEach((k, v) ->
commandStore.timestampsForKeyCache().release(v));
timestampsForKey.clear();
-
commandsForKey.values().forEach(commandStore.commandsForKeyCache()::release);
+ commandsForKey.forEach((k, v) ->
commandStore.commandsForKeyCache().release(v));
commandsForKey.clear();
}
void revertChanges()
{
- commands.values().forEach(AccordSafeState::revert);
- timestampsForKey.values().forEach(AccordSafeState::revert);
- commandsForKey.values().forEach(AccordSafeState::revert);
+ commands.forEach((k, v) -> v.revert());
+ timestampsForKey.forEach((k, v) -> v.revert());
+ commandsForKey.forEach((k, v) -> v.revert());
if (commandsForRanges != null)
commandsForRanges.revert();
}
diff --git
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index a329be5864..ca59f202b7 100644
---
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -522,7 +522,7 @@ public class CompactionAccordIteratorsTest
private static BiConsumer<Command, Command>
appendDiffToKeyspace(AccordCommandStore commandStore)
{
return (before, after) -> {
- AccordKeyspace.getCommandMutation(commandStore.id(), before,
after, commandStore.nextSystemTimestampMicros()).applyUnsafe();
+ AccordKeyspace.getCommandMutation(commandStore.id(), after,
commandStore.nextSystemTimestampMicros()).applyUnsafe();
};
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java
index d82f143054..6f10d977b2 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java
@@ -183,7 +183,7 @@ public class AccordStateCacheTest
ManualExecutor executor = new ManualExecutor();
AccordStateCache cache = new AccordStateCache(executor, executor, 500,
cacheMetrics);
AccordStateCache.Instance<String, String, SafeString> instance =
- cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (original, current) -> null, (k, v) -> true, String::length);
+ cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (current) -> null, (k, v) -> true, String::length);
assertCacheState(cache, 0, 0, 0);
SafeString safeString1 = instance.acquire("1");
@@ -215,9 +215,9 @@ public class AccordStateCacheTest
ManualExecutor executor = new ManualExecutor();
AccordStateCache cache = new AccordStateCache(executor, executor, 500,
cacheMetrics);
AccordStateCache.Instance<String, String, SafeString> stringInstance =
- cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (original, current) -> null, (k, v) -> true,String::length);
+ cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (current) -> null, (k, v) -> true,String::length);
AccordStateCache.Instance<Integer, Integer, SafeInt> intInstance =
- cache.instance(Integer.class, SafeInt.class, SafeInt::new, key ->
key, (original, current) -> null, (k, v) -> true,ignored -> Integer.BYTES);
+ cache.instance(Integer.class, SafeInt.class, SafeInt::new, key ->
key, (current) -> null, (k, v) -> true,ignored -> Integer.BYTES);
assertCacheState(cache, 0, 0, 0);
SafeString safeString1 = stringInstance.acquire("1");
@@ -255,7 +255,7 @@ public class AccordStateCacheTest
ManualExecutor executor = new ManualExecutor();
AccordStateCache cache = new AccordStateCache(executor, executor,
DEFAULT_NODE_SIZE * 5, cacheMetrics);
AccordStateCache.Instance<String, String, SafeString> instance =
- cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (original, current) -> null, (k, v) -> true, String::length);
+ cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (current) -> null, (k, v) -> true, String::length);
assertCacheState(cache, 0, 0, 0);
SafeString[] items = new SafeString[3];
@@ -295,7 +295,7 @@ public class AccordStateCacheTest
ManualExecutor executor = new ManualExecutor();
AccordStateCache cache = new AccordStateCache(executor, executor,
nodeSize(1) * 5, cacheMetrics);
AccordStateCache.Instance<String, String, SafeString> instance =
- cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (original, current) -> null, (k, v) -> true, String::length);
+ cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (current) -> null, (k, v) -> true, String::length);
assertCacheState(cache, 0, 0, 0);
SafeString[] items = new SafeString[5];
@@ -341,7 +341,7 @@ public class AccordStateCacheTest
ManualExecutor executor = new ManualExecutor();
AccordStateCache cache = new AccordStateCache(executor, executor,
nodeSize(1) * 4, cacheMetrics);
AccordStateCache.Instance<String, String, SafeString> instance =
- cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (original, current) -> null, (k, v) -> true, String::length);
+ cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (current) -> null, (k, v) -> true, String::length);
assertCacheState(cache, 0, 0, 0);
SafeString[] items = new SafeString[5];
@@ -380,7 +380,7 @@ public class AccordStateCacheTest
ManualExecutor executor = new ManualExecutor();
AccordStateCache cache = new AccordStateCache(executor, executor,
DEFAULT_NODE_SIZE * 4, cacheMetrics);
AccordStateCache.Instance<String, String, SafeString> instance =
- cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (original, current) -> null, (k, v) -> true, String::length);
+ cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (current) -> null, (k, v) -> true, String::length);
assertCacheState(cache, 0, 0, 0);
SafeString safeString1 = instance.acquire("0");
@@ -411,7 +411,7 @@ public class AccordStateCacheTest
ManualExecutor executor = new ManualExecutor();
AccordStateCache cache = new AccordStateCache(executor, executor,
nodeSize(1) * 3 + nodeSize(3), cacheMetrics);
AccordStateCache.Instance<String, String, SafeString> instance =
- cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (original, current) -> null, (k, v) -> true, String::length);
+ cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (current) -> null, (k, v) -> true, String::length);
assertCacheState(cache, 0, 0, 0);
SafeString item = instance.acquire(Integer.toString(0));
@@ -450,7 +450,7 @@ public class AccordStateCacheTest
ManualExecutor executor = new ManualExecutor();
AccordStateCache cache = new AccordStateCache(executor, executor, 500,
cacheMetrics);
AccordStateCache.Instance<String, String, SafeString> instance =
- cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (original, current) -> null, (k, v) -> true, String::length);
+ cache.instance(String.class, SafeString.class, SafeString::new,
key -> key, (current) -> null, (k, v) -> true, String::length);
assertCacheState(cache, 0, 0, 0);
SafeString safeString = instance.acquire("1");
diff --git
a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
index 680777bde8..5a09a23134 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.service.accord.async;
+import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -155,7 +156,7 @@ public class AsyncLoaderTest
timestamps.preExecute();
timestamps.initialize();
- AccordKeyspace.getTimestampsForKeyMutation(commandStore.id(), null,
timestamps.current(), commandStore.nextSystemTimestampMicros()).apply();
+ AccordKeyspace.getTimestampsForKeyMutation(commandStore.id(),
timestamps.current(), commandStore.nextSystemTimestampMicros()).apply();
// resources are on disk only, so the loader should suspend...
AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId),
RoutingKeys.of(key), TIMESTAMPS);
@@ -203,7 +204,7 @@ public class AsyncLoaderTest
testLoad(executor, safeCommand, notDefined(txnId, txn));
commandCache.release(safeCommand);
- AccordKeyspace.getTimestampsForKeyMutation(commandStore.id(), null,
new TimestampsForKey(key), commandStore.nextSystemTimestampMicros()).apply();
+ AccordKeyspace.getTimestampsForKeyMutation(commandStore.id(), new
TimestampsForKey(key), commandStore.nextSystemTimestampMicros()).apply();
// resources are on disk only, so the loader should suspend...
AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId),
RoutingKeys.of(key), TIMESTAMPS);
@@ -353,7 +354,7 @@ public class AsyncLoaderTest
// acquire / release
commandCache.unsafeSetLoadFunction(id -> notDefined(id, txn));
- commandCache.unsafeSetSaveFunction((before, after) -> () -> { throw
new AssertionError("nodes expected to be saved manually"); });
+ commandCache.unsafeSetSaveFunction((after) -> () -> { throw new
AssertionError("nodes expected to be saved manually"); });
AccordSafeCommand safeCommand = commandCache.acquire(txnId);
testLoad(executor, safeCommand, notDefined(txnId, txn));
@@ -361,7 +362,7 @@ public class AsyncLoaderTest
commandCache.release(safeCommand);
Assert.assertEquals(AccordCachingState.Status.MODIFIED,
commandCache.getUnsafe(txnId).status());
- commandCache.getUnsafe(txnId).save(executor, (before, after) -> () ->
{});
+ commandCache.getUnsafe(txnId).save(executor, (after) -> () -> {});
Assert.assertEquals(AccordCachingState.Status.SAVING,
commandCache.getUnsafe(txnId).status());
// since the command is still saving, the loader shouldn't be able to
acquire a reference
@@ -402,7 +403,7 @@ public class AsyncLoaderTest
inProgressCFKSaveTest(TIMESTAMPS,
AccordCommandStore::timestampsForKeyCache, context -> context.timestampsForKey,
TimestampsForKey::new, (tfk, c) -> new TimestampsForKey(tfk.key(),
c.executeAt(), c.executeAt().hlc(), c.txnId(), c.executeAt()));
}
- private <T1, T2 extends AccordSafeState<RoutingKey, T1>, C extends
AccordStateCache.Instance<RoutingKey, T1, T2>> void
inProgressCFKSaveTest(KeyHistory history, Function<AccordCommandStore, C>
getter, Function<Context, TreeMap<?, ?>> inContext, Function<RoutingKey, T1>
initialiser, BiFunction<T1, Command, T1> update)
+ private <T1, T2 extends AccordSafeState<RoutingKey, T1>, C extends
AccordStateCache.Instance<RoutingKey, T1, T2>> void
inProgressCFKSaveTest(KeyHistory history, Function<AccordCommandStore, C>
getter, Function<Context, Map<?, ?>> inContext, Function<RoutingKey, T1>
initialiser, BiFunction<T1, Command, T1> update)
{
AtomicLong clock = new AtomicLong(0);
ManualExecutor executor = new ManualExecutor();
@@ -410,7 +411,7 @@ public class AsyncLoaderTest
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl",
executor, executor);
C cache = getter.apply(commandStore);
- cache.unsafeSetSaveFunction((before, after) -> () -> { throw new
AssertionError("nodes expected to be saved manually"); });
+ cache.unsafeSetSaveFunction((after) -> () -> { throw new
AssertionError("nodes expected to be saved manually"); });
TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
PartialTxn txn = createPartialTxn(0);
@@ -424,7 +425,7 @@ public class AsyncLoaderTest
cache.release(safe);
Assert.assertEquals(AccordCachingState.Status.MODIFIED,
cache.getUnsafe(key).status());
- cache.getUnsafe(key).save(executor, (before, after) -> () -> {});
+ cache.getUnsafe(key).save(executor, (after) -> () -> {});
Assert.assertEquals(AccordCachingState.Status.SAVING,
cache.getUnsafe(key).status());
// since the command is still saving, the loader shouldn't be able to
acquire a reference
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]