This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 83a5f7a69d Command table now uses a local version added to the payload
rather than rely on a version column, and fixed a few feedback related issues
83a5f7a69d is described below
commit 83a5f7a69d567536bbb2b46efd350a1581149254
Author: David Capwell <[email protected]>
AuthorDate: Fri Oct 21 16:29:28 2022 -0700
Command table now uses a local version added to the payload rather than
rely on a version column, and fixed a few feedback related issues
patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-17103
---
.build/include-accord.sh | 4 +-
.../db/filter/AbstractClusteringIndexFilter.java | 1 -
.../db/filter/ClusteringIndexSliceFilter.java | 2 -
.../apache/cassandra/db/partitions/Partition.java | 2 -
.../cassandra/io/LocalVersionedSerializer.java | 94 +++++++++++++
.../cassandra/io/MessageVersionProvider.java | 24 ++++
.../apache/cassandra/net/ResponseVerbHandler.java | 2 +-
.../cassandra/service/accord/AccordCommand.java | 1 +
.../service/accord/AccordCommandStore.java | 11 +-
.../service/accord/AccordCommandsForKey.java | 2 +-
.../cassandra/service/accord/AccordKeyspace.java | 145 ++++++++-------------
.../service/accord/AccordMessageSink.java | 3 +-
.../service/accord/AccordPartialCommand.java | 85 +++++-------
.../service/accord/AccordSerializerVersion.java | 114 ++++++++++++++++
.../cassandra/service/accord/AccordStateCache.java | 24 ++--
.../service/accord/async/AsyncLoader.java | 6 +-
.../service/accord/async/AsyncOperation.java | 26 ++--
.../service/accord/async/AsyncWriter.java | 14 +-
.../accord/serializers/AcceptSerializers.java | 6 +-
.../cassandra/simulator/asm/InterceptClasses.java | 3 +-
.../cassandra/simulator/SimulationRunner.java | 1 +
.../apache/cassandra/simulator/SimulatorUtils.java | 7 +-
.../service/accord/async/AsyncLoaderTest.java | 19 +--
.../service/accord/async/AsyncWriterTest.java | 3 +-
24 files changed, 382 insertions(+), 217 deletions(-)
diff --git a/.build/include-accord.sh b/.build/include-accord.sh
index 7069e22b6f..0dc01bf2ba 100755
--- a/.build/include-accord.sh
+++ b/.build/include-accord.sh
@@ -24,8 +24,8 @@ set -o nounset
bin="$(cd "$(dirname "$0")" > /dev/null; pwd)"
-accord_repo='https://github.com/bdeggleston/cassandra-accord.git'
-accord_branch='metadata-persistence'
+accord_repo='https://github.com/apache/cassandra-accord.git'
+accord_branch='trunk'
accord_src="$bin/cassandra-accord"
checkout() {
diff --git
a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
index e1792d17f8..ddcaaed812 100644
--- a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db.filter;
import java.io.IOException;
-import java.util.Objects;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
diff --git
a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
index bcf787fee7..178c96b453 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.filter;
import java.io.IOException;
import java.util.List;
import java.nio.ByteBuffer;
-import java.util.Objects;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
@@ -31,7 +30,6 @@ import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.stringtemplate.v4.ST;
/**
* A filter over a single partition.
diff --git a/src/java/org/apache/cassandra/db/partitions/Partition.java
b/src/java/org/apache/cassandra/db/partitions/Partition.java
index 1daf20cd00..601934a8e7 100644
--- a/src/java/org/apache/cassandra/db/partitions/Partition.java
+++ b/src/java/org/apache/cassandra/db/partitions/Partition.java
@@ -21,12 +21,10 @@ import java.util.NavigableSet;
import javax.annotation.Nullable;
-import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.service.accord.api.AccordKey;
/**
* In-memory representation of a Partition.
diff --git a/src/java/org/apache/cassandra/io/LocalVersionedSerializer.java
b/src/java/org/apache/cassandra/io/LocalVersionedSerializer.java
new file mode 100644
index 0000000000..739f007846
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/LocalVersionedSerializer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * Serializer that stores the version within the buffer. Normal usage of
{@link IVersionedSerializer} is to rely on
+ * {@link org.apache.cassandra.net.MessagingService#current_version} and
messaging version numbers, but that implies
+ * that a messaging version bump is required if a change is made to this
field; for some cases the serializer isn't
+ * dealing with messages and instead are blobs stored in a table, for these
cases it may be better to rely on a field
+ * specific versioning that gets stored along the data.
+ */
+public class LocalVersionedSerializer<I>
+{
+ private final MessageVersionProvider currentVersion;
+ private final IVersionedSerializer<MessageVersionProvider>
versionSerializer;
+ private final IVersionedSerializer<I> serializer;
+
+ public <V extends MessageVersionProvider> LocalVersionedSerializer(V
currentVersion,
+
IVersionedSerializer<V> versionSerializer,
+
IVersionedSerializer<I> serializer)
+ {
+ // V is local to the constructor to validate at construction time
things are fine, but don't want in the type
+ // sig of the class as it just gets verbose...
+ this.currentVersion = Objects.requireNonNull(currentVersion);
+ this.versionSerializer =
(IVersionedSerializer<MessageVersionProvider>)
Objects.requireNonNull(versionSerializer);
+ this.serializer = Objects.requireNonNull(serializer);
+ }
+
+ public IVersionedSerializer<I> serializer()
+ {
+ return serializer;
+ }
+
+ /**
+ * Serialize the specified type into the specified DataOutputStream
instance.
+ *
+ * @param t type that needs to be serialized
+ * @param out DataOutput into which serialization needs to happen.
+ * @throws IOException if serialization fails
+ */
+ public void serialize(I t, DataOutputPlus out) throws IOException
+ {
+ versionSerializer.serialize(currentVersion, out,
currentVersion.messageVersion());
+ serializer.serialize(t, out, currentVersion.messageVersion());
+ }
+
+ /**
+ * Deserialize into the specified DataInputStream instance.
+ *
+ * @param in DataInput from which deserialization needs to happen.
+ * @return the type that was deserialized
+ * @throws IOException if deserialization fails
+ */
+ public I deserialize(DataInputPlus in) throws IOException
+ {
+ MessageVersionProvider version = versionSerializer.deserialize(in,
currentVersion.messageVersion());
+ return serializer.deserialize(in, version.messageVersion());
+ }
+
+ /**
+ * Calculate serialized size of object without actually serializing.
+ *
+ * @param t object to calculate serialized size
+ * @return serialized size of object t
+ */
+ public long serializedSize(I t)
+ {
+ long size = versionSerializer.serializedSize(currentVersion,
currentVersion.messageVersion());
+ size += serializer.serializedSize(t, currentVersion.messageVersion());
+ return size;
+ }
+}
diff --git a/src/java/org/apache/cassandra/io/MessageVersionProvider.java
b/src/java/org/apache/cassandra/io/MessageVersionProvider.java
new file mode 100644
index 0000000000..a6ad468281
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/MessageVersionProvider.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+public interface MessageVersionProvider
+{
+ int messageVersion();
+}
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 3518888591..1cee468cd3 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -26,7 +26,7 @@ import org.apache.cassandra.tracing.Tracing;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
-public class ResponseVerbHandler implements IVerbHandler
+class ResponseVerbHandler implements IVerbHandler
{
public static final ResponseVerbHandler instance = new
ResponseVerbHandler();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java
b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
index 3c373e60df..00feea4c4e 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
@@ -184,6 +184,7 @@ public class AccordCommand extends Command implements
AccordState<TxnId>
// ", txn=" + txn +
// ", writes=" + writes +
// ", result=" + result +
+ ", txn is null?=" + (txn.get() == null) +
", isGloballyPersistent=" + isGloballyPersistent +
", waitingOnCommit=" + waitingOnCommit +
", waitingOnApply=" + waitingOnApply +
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index f8659e0b3c..ce6c8b0e68 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.service.accord;
+import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -172,7 +173,8 @@ public class AccordCommandStore extends CommandStore
public Command command(TxnId txnId)
{
AccordCommand command = getCommandInternal(txnId);
- if (command.isEmpty()) command.initialize();
+ if (command.isEmpty())
+ command.initialize();
return command;
}
@@ -190,8 +192,9 @@ public class AccordCommandStore extends CommandStore
private AccordCommandsForKey getCommandsForKeyInternal(Key key)
{
- Preconditions.checkState(currentCtx != null);
- Preconditions.checkArgument(key instanceof PartitionKey);
+ Objects.requireNonNull(currentCtx, "current context");
+ if (!(key instanceof PartitionKey))
+ throw new IllegalArgumentException("Attempted to use
non-PartitionKey; given " + key.getClass());
AccordCommandsForKey commandsForKey =
currentCtx.commandsForKey.get((PartitionKey) key);
if (commandsForKey == null)
throw new IllegalArgumentException("No commandsForKey in context
for key " + key);
@@ -263,7 +266,7 @@ public class AccordCommandStore extends CommandStore
}
catch (ExecutionException e)
{
- throw new RuntimeException(e);
+ throw new RuntimeException(e.getCause());
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
index 54ce46a3f9..d2d84899fc 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
@@ -408,7 +408,7 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
// we use the executeAt time instead of the monotonic database
timestamp to prevent uneven
// ttl expiration in extreme cases, ie 1M+ writes/second to a key
causing timestamps to overflow
// into the next second on some keys and not others.
- return (int)
TimeUnit.MICROSECONDS.toSeconds(getTimestampMicros(lastExecutedTimestamp.get()));
+ return
Math.toIntExact(TimeUnit.MICROSECONDS.toSeconds(getTimestampMicros(lastExecutedTimestamp.get())));
}
public long timestampMicrosFor(Timestamp executeAt, boolean isForWriteTxn)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index d2c3ef6928..d37f39a32c 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -33,8 +33,6 @@ import java.util.function.Supplier;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +44,8 @@ import accord.primitives.Ballot;
import accord.primitives.Deps;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
+import accord.txn.Txn;
+import accord.txn.Writes;
import accord.utils.DeterministicIdentitySet;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.UntypedResultSet;
@@ -80,9 +80,9 @@ import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.transform.FilteredPartitions;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.LocalVersionedSerializer;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Functions;
import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -115,7 +115,6 @@ public class AccordKeyspace
private static final Logger logger =
LoggerFactory.getLogger(AccordKeyspace.class);
public static final String COMMANDS = "commands";
- public static final String COMMAND_SERIES = "command_series";
public static final String COMMANDS_FOR_KEY = "commands_for_key";
private static final String TIMESTAMP_TUPLE = "tuple<bigint, bigint, int,
bigint>";
@@ -134,20 +133,14 @@ public class AccordKeyspace
+ format("txn_id %s,", TIMESTAMP_TUPLE)
+ "status int,"
+ "home_key blob,"
- + "home_key_version int,"
+ "progress_key blob,"
- + "progress_key_version blob,"
+ "is_globally_persistent boolean,"
- + "txn_version int,"
+ "txn blob,"
+ format("execute_at %s,", TIMESTAMP_TUPLE)
+ format("promised_ballot %s,", TIMESTAMP_TUPLE)
+ format("accepted_ballot %s,", TIMESTAMP_TUPLE)
- + "dependencies_version int,"
+ "dependencies blob,"
- + "writes_version int,"
+ "writes blob,"
- + "result_version int,"
+ "result blob,"
+ format("waiting_on_commit map<%s, blob>,", TIMESTAMP_TUPLE)
+ format("waiting_on_apply map<%s, blob>,", TIMESTAMP_TUPLE)
@@ -157,6 +150,20 @@ public class AccordKeyspace
+ "PRIMARY KEY((store_generation, store_index, txn_id))"
+ ')');
+ private static class CommandsSerializers
+ {
+ static final LocalVersionedSerializer<AccordKey> ACCORD_KEY_SERIALIZER
= localSerializer(AccordKey.serializer);
+ static final LocalVersionedSerializer<Txn> TXN_SERIALIZER =
localSerializer(CommandSerializers.txn);
+ static final LocalVersionedSerializer<Deps> DEPS_SERIALIZER =
localSerializer(CommandSerializers.deps);
+ static final LocalVersionedSerializer<Writes> WRITES_SERIALIZER =
localSerializer(CommandSerializers.writes);
+ static final LocalVersionedSerializer<AccordData>
RESULT_DATA_SERIALIZER = localSerializer(AccordData.serializer);
+
+ private static <T> LocalVersionedSerializer<T>
localSerializer(IVersionedSerializer<T> serializer)
+ {
+ return new
LocalVersionedSerializer<>(AccordSerializerVersion.CURRENT,
AccordSerializerVersion.serializer, serializer);
+ }
+ }
+
private static ColumnMetadata getColumn(TableMetadata metadata, String
name)
{
ColumnMetadata column = metadata.getColumn(new ColumnIdentifier(name,
true));
@@ -170,20 +177,14 @@ public class AccordKeyspace
static final ClusteringComparator keyComparator =
Commands.partitionKeyAsClusteringComparator();
static final ColumnMetadata status = getColumn(Commands, "status");
static final ColumnMetadata home_key = getColumn(Commands, "home_key");
- static final ColumnMetadata home_key_version = getColumn(Commands,
"home_key_version");
static final ColumnMetadata progress_key = getColumn(Commands,
"progress_key");
- static final ColumnMetadata progress_key_version = getColumn(Commands,
"progress_key_version");
static final ColumnMetadata is_globally_persistent =
getColumn(Commands, "is_globally_persistent");
- static final ColumnMetadata txn_version = getColumn(Commands,
"txn_version");
static final ColumnMetadata txn = getColumn(Commands, "txn");
static final ColumnMetadata execute_at = getColumn(Commands,
"execute_at");
static final ColumnMetadata promised_ballot = getColumn(Commands,
"promised_ballot");
static final ColumnMetadata accepted_ballot = getColumn(Commands,
"accepted_ballot");
- static final ColumnMetadata dependencies_version = getColumn(Commands,
"dependencies_version");
static final ColumnMetadata dependencies = getColumn(Commands,
"dependencies");
- static final ColumnMetadata writes_version = getColumn(Commands,
"writes_version");
static final ColumnMetadata writes = getColumn(Commands, "writes");
- static final ColumnMetadata result_version = getColumn(Commands,
"result_version");
static final ColumnMetadata result = getColumn(Commands, "result");
static final ColumnMetadata waiting_on_commit = getColumn(Commands,
"waiting_on_commit");
static final ColumnMetadata waiting_on_apply = getColumn(Commands,
"waiting_on_apply");
@@ -235,15 +236,21 @@ public class AccordKeyspace
return commandsForKey.maxTimestamp.hasModifications()
|| commandsForKey.lastExecutedTimestamp.hasModifications()
|| commandsForKey.lastExecutedMicros.hasModifications()
+ || commandsForKey.lastWriteTimestamp.hasModifications()
|| commandsForKey.blindWitnessed.hasModifications();
}
+ private static boolean hasRegularChanges(AccordCommandsForKey
commandsForKey)
+ {
+ return commandsForKey.uncommitted.map.hasModifications()
+ || commandsForKey.committedById.map.hasModifications()
+ ||
commandsForKey.committedByExecuteAt.map.hasModifications();
+ }
+
static RegularAndStaticColumns columnsFor(AccordCommandsForKey
commandsForKey)
{
boolean hasStaticChanges = hasStaticChanges(commandsForKey);
- boolean hasRegularChanges =
commandsForKey.uncommitted.map.hasAdditions()
- ||
commandsForKey.committedById.map.hasAdditions()
- ||
commandsForKey.committedByExecuteAt.map.hasAdditions();
+ boolean hasRegularChanges = hasRegularChanges(commandsForKey);
if (hasStaticChanges && hasRegularChanges)
return all;
@@ -252,7 +259,7 @@ public class AccordKeyspace
else if (hasRegularChanges)
return justRegular;
else
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException("CommandsForKey
has_modifications=" + commandsForKey.hasModifications() + ", but no Static or
Regular columns changed!");
}
}
@@ -275,41 +282,34 @@ public class AccordKeyspace
return Tables.of(Commands, CommandsForKey);
}
- private static <T> ByteBuffer serialize(T obj, IVersionedSerializer<T>
serializer, int version) throws IOException
+ private static <T> ByteBuffer serialize(T obj, LocalVersionedSerializer<T>
serializer) throws IOException
{
- int size = (int) serializer.serializedSize(obj, version);
+ int size = (int) serializer.serializedSize(obj);
try (DataOutputBuffer out = new DataOutputBuffer(size))
{
- serializer.serialize(obj, out, version);
- assert size == out.buffer().limit();
- return out.buffer();
+ serializer.serialize(obj, out);
+ ByteBuffer bb = out.buffer();
+ assert size == bb.limit() : String.format("Expected to write %d
but wrote %d", size, bb.limit());
+ return bb;
}
}
- private static <T> ByteBuffer serializeOrNull(T obj,
IVersionedSerializer<T> serializer, int version) throws IOException
+ private static <T> ByteBuffer serializeOrNull(T obj,
LocalVersionedSerializer<T> serializer) throws IOException
{
- return obj != null ? serialize(obj, serializer, version) :
EMPTY_BYTE_BUFFER;
+ return obj != null ? serialize(obj, serializer) : EMPTY_BYTE_BUFFER;
}
- private static <T> T deserialize(ByteBuffer bytes, IVersionedSerializer<T>
serializer, int version) throws IOException
+ private static <T> T deserialize(ByteBuffer bytes,
LocalVersionedSerializer<T> serializer) throws IOException
{
try (DataInputBuffer in = new DataInputBuffer(bytes, true))
{
- return serializer.deserialize(in, version);
+ return serializer.deserialize(in);
}
}
- private static <T> T deserializeOrNull(ByteBuffer bytes,
IVersionedSerializer<T> serializer, int version) throws IOException
+ private static <T> T deserializeOrNull(ByteBuffer bytes,
LocalVersionedSerializer<T> serializer) throws IOException
{
- return bytes != null && ! ByteBufferAccessor.instance.isEmpty(bytes) ?
deserialize(bytes, serializer, version) : null;
- }
-
- private static Map<ByteBuffer, ByteBuffer> serializeWaitingOn(Map<TxnId,
ByteBuffer> waitingOn)
- {
- Map<ByteBuffer, ByteBuffer> result =
Maps.newHashMapWithExpectedSize(waitingOn.size());
- for (Map.Entry<TxnId, ByteBuffer> entry : waitingOn.entrySet())
- result.put(serializeTimestamp(entry.getKey()), entry.getValue());
- return result;
+ return bytes != null && ! ByteBufferAccessor.instance.isEmpty(bytes) ?
deserialize(bytes, serializer) : null;
}
private static NavigableMap<TxnId, ByteBuffer>
deserializeWaitingOn(Map<ByteBuffer, ByteBuffer> serialized)
@@ -345,16 +345,6 @@ public class AccordKeyspace
return deserializeTimestampSet(row.getSet(name, BytesType.instance),
TreeSet::new, TxnId::new);
}
- public static Set<ByteBuffer> serializeListeners(Set<ListenerProxy>
listeners)
- {
- Set<ByteBuffer> result =
Sets.newHashSetWithExpectedSize(listeners.size());
- for (ListenerProxy listener : listeners)
- {
- result.add(listener.identifier());
- }
- return result;
- }
-
private static DeterministicIdentitySet<ListenerProxy>
deserializeListeners(CommandStore commandStore, Set<ByteBuffer> serialized)
throws IOException
{
if (serialized == null || serialized.isEmpty())
@@ -433,32 +423,22 @@ public class AccordKeyspace
Row.Builder builder = BTreeRow.unsortedBuilder();
builder.newRow(Clustering.EMPTY);
int nowInSeconds = (int)
TimeUnit.MICROSECONDS.toSeconds(timestampMicros);
- int version = MessagingService.current_version;
- ByteBuffer versionBytes = accessor.valueOf(version);
+
if (command.status.hasModifications())
builder.addCell(live(CommandsColumns.status, timestampMicros,
accessor.valueOf(command.status.get().ordinal())));
if (command.homeKey.hasModifications())
- {
- builder.addCell(live(CommandsColumns.home_key_version,
timestampMicros, versionBytes));
- builder.addCell(live(CommandsColumns.home_key,
timestampMicros, serializeOrNull((AccordKey) command.homeKey.get(),
AccordKey.serializer, version)));
- }
+ builder.addCell(live(CommandsColumns.home_key,
timestampMicros, serializeOrNull((AccordKey) command.homeKey.get(),
CommandsSerializers.ACCORD_KEY_SERIALIZER)));
if (command.progressKey.hasModifications())
- {
- builder.addCell(live(CommandsColumns.progress_key_version,
timestampMicros, versionBytes));
- builder.addCell(live(CommandsColumns.progress_key,
timestampMicros, serializeOrNull((AccordKey) command.progressKey.get(),
AccordKey.serializer, version)));
- }
+ builder.addCell(live(CommandsColumns.progress_key,
timestampMicros, serializeOrNull((AccordKey) command.progressKey.get(),
CommandsSerializers.ACCORD_KEY_SERIALIZER)));
if (command.isGloballyPersistent.hasModifications())
builder.addCell(live(CommandsColumns.is_globally_persistent,
timestampMicros, accessor.valueOf(command.isGloballyPersistent.get())));
if (command.txn.hasModifications())
- {
- builder.addCell(live(CommandsColumns.txn_version,
timestampMicros, versionBytes));
- builder.addCell(live(CommandsColumns.txn, timestampMicros,
serializeOrNull(command.txn.get(), CommandSerializers.txn, version)));
- }
+ builder.addCell(live(CommandsColumns.txn, timestampMicros,
serializeOrNull(command.txn.get(), CommandsSerializers.TXN_SERIALIZER)));
if (command.executeAt.hasModifications())
builder.addCell(live(CommandsColumns.execute_at,
timestampMicros, serializeTimestamp(command.executeAt.get())));
@@ -470,22 +450,13 @@ public class AccordKeyspace
builder.addCell(live(CommandsColumns.accepted_ballot,
timestampMicros, serializeTimestamp(command.accepted.get())));
if (command.deps.hasModifications())
- {
- builder.addCell(live(CommandsColumns.dependencies_version,
timestampMicros, versionBytes));
- builder.addCell(live(CommandsColumns.dependencies,
timestampMicros, serialize(command.deps.get(), CommandSerializers.deps,
version)));
- }
+ builder.addCell(live(CommandsColumns.dependencies,
timestampMicros, serialize(command.deps.get(),
CommandsSerializers.DEPS_SERIALIZER)));
if (command.writes.hasModifications())
- {
- builder.addCell(live(CommandsColumns.writes_version,
timestampMicros, versionBytes));
- builder.addCell(live(CommandsColumns.writes, timestampMicros,
serialize(command.writes.get(), CommandSerializers.writes, version)));
- }
+ builder.addCell(live(CommandsColumns.writes, timestampMicros,
serialize(command.writes.get(), CommandsSerializers.WRITES_SERIALIZER)));
if (command.result.hasModifications())
- {
- builder.addCell(live(CommandsColumns.result_version,
timestampMicros, versionBytes));
- builder.addCell(live(CommandsColumns.result, timestampMicros,
serialize((AccordData) command.result.get(), AccordData.serializer, version)));
- }
+ builder.addCell(live(CommandsColumns.result, timestampMicros,
serialize((AccordData) command.result.get(),
CommandsSerializers.RESULT_DATA_SERIALIZER)));
if (command.waitingOnCommit.hasModifications())
{
@@ -535,7 +506,6 @@ public class AccordKeyspace
private static ByteBuffer serializeKey(PartitionKey key)
{
- UUIDSerializer.instance.serialize(key.tableId().asUUID());
return TupleType.buildValue(new
ByteBuffer[]{UUIDSerializer.instance.serialize(key.tableId().asUUID()),
key.partitionKey().getKey()});
}
@@ -570,12 +540,12 @@ public class AccordKeyspace
return command;
}
- private static <T> T deserializeWithVersionOr(UntypedResultSet.Row row,
String dataColumn, String versionColumn, IVersionedSerializer<T> serializer,
Supplier<T> defaultSupplier) throws IOException
+ private static <T> T deserializeWithVersionOr(UntypedResultSet.Row row,
String dataColumn, LocalVersionedSerializer<T> serializer, Supplier<T>
defaultSupplier) throws IOException
{
- if (!row.has(versionColumn))
+ if (!row.has(dataColumn))
return defaultSupplier.get();
- return deserialize(row.getBlob(dataColumn), serializer,
row.getInt(versionColumn));
+ return deserialize(row.getBlob(dataColumn), serializer);
}
public static UntypedResultSet loadCommandRow(CommandStore commandStore,
TxnId txnId)
@@ -598,11 +568,6 @@ public class AccordKeyspace
AccordCommandStore commandStore = command.commandStore();
commandStore.checkNotInStoreThread();
- String cql = "SELECT * FROM %s.%s " +
- "WHERE store_generation=? " +
- "AND store_index=? " +
- "AND txn_id=(?, ?, ?, ?)";
-
UntypedResultSet result = loadCommandRow(commandStore,
command.txnId());
if (result.isEmpty())
@@ -616,16 +581,16 @@ public class AccordKeyspace
UntypedResultSet.Row row = result.one();
Preconditions.checkState(deserializeTimestampOrNull(row, "txn_id",
TxnId::new).equals(txnId));
command.status.load(Status.values()[row.getInt("status")]);
- command.homeKey.load(deserializeOrNull(row.getBlob("home_key"),
AccordKey.serializer, row.getInt("home_key_version")));
-
command.progressKey.load(deserializeOrNull(row.getBlob("progress_key"),
AccordKey.serializer, row.getInt("progress_key_version")));
+ command.homeKey.load(deserializeOrNull(row.getBlob("home_key"),
CommandsSerializers.ACCORD_KEY_SERIALIZER));
+
command.progressKey.load(deserializeOrNull(row.getBlob("progress_key"),
CommandsSerializers.ACCORD_KEY_SERIALIZER));
command.isGloballyPersistent.load(row.getBoolean("is_globally_persistent"));
- command.txn.load(deserializeOrNull(row.getBlob("txn"),
CommandSerializers.txn, row.getInt("txn_version")));
+ command.txn.load(deserializeOrNull(row.getBlob("txn"),
CommandsSerializers.TXN_SERIALIZER));
command.executeAt.load(deserializeTimestampOrNull(row,
"execute_at", Timestamp::new));
command.promised.load(deserializeTimestampOrNull(row,
"promised_ballot", Ballot::new));
command.accepted.load(deserializeTimestampOrNull(row,
"accepted_ballot", Ballot::new));
- command.deps.load(deserializeWithVersionOr(row, "dependencies",
"dependencies_version", CommandSerializers.deps, () -> Deps.NONE));
- command.writes.load(deserializeWithVersionOr(row, "writes",
"writes_version", CommandSerializers.writes, () -> null));
- command.result.load(deserializeWithVersionOr(row, "result",
"result_version", AccordData.serializer, () -> null));
+ command.deps.load(deserializeWithVersionOr(row, "dependencies",
CommandsSerializers.DEPS_SERIALIZER, () -> Deps.NONE));
+ command.writes.load(deserializeWithVersionOr(row, "writes",
CommandsSerializers.WRITES_SERIALIZER, () -> null));
+ command.result.load(deserializeWithVersionOr(row, "result",
CommandsSerializers.RESULT_DATA_SERIALIZER, () -> null));
command.waitingOnCommit.load(deserializeWaitingOn(row,
"waiting_on_commit"));
command.blockingCommitOn.load(deserializeBlocking(row,
"blocking_commit_on"));
command.waitingOnApply.load(deserializeWaitingOn(row,
"waiting_on_apply"));
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index b723b27e6e..32c136b1b7 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service.accord;
import java.util.EnumMap;
import java.util.Map;
+import java.util.Objects;
import com.google.common.base.Preconditions;
@@ -89,7 +90,7 @@ public class AccordMessageSink implements MessageSink
public void send(Node.Id to, Request request)
{
Verb verb = getVerb(request.type());
- Preconditions.checkArgument(verb != null);
+ Objects.requireNonNull(verb, "verb");
Message<Request> message = Message.out(verb, request);
InetAddressAndPort endpoint = getEndpoint(to);
logger.debug("Sending {} {} to {}", verb, message.payload, endpoint);
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
index 1419a7595f..c2c3010e43 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
@@ -32,12 +32,10 @@ import accord.primitives.Deps;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.txn.Txn;
-import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.accord.async.AsyncContext;
import org.apache.cassandra.service.accord.serializers.CommandSerializers;
@@ -62,7 +60,7 @@ public class AccordPartialCommand implements PartialCommand
}
@Override
- public PartialCommand deserializeBody(TxnId txnId, Txn txn, Timestamp
executeAt, Status status, DataInputPlus in, Version version) throws IOException
+ public PartialCommand deserializeBody(TxnId txnId, Txn txn, Timestamp
executeAt, Status status, DataInputPlus in, AccordSerializerVersion version)
throws IOException
{
return new AccordPartialCommand(txnId, txn, executeAt, status);
}
@@ -121,7 +119,8 @@ public class AccordPartialCommand implements PartialCommand
public void forEachRemovedListener(Consumer<Listener> consumer)
{
- removedListeners.forEach(consumer);
+ if (removedListeners != null)
+ removedListeners.forEach(consumer);
}
@Override
@@ -168,24 +167,24 @@ public class AccordPartialCommand implements
PartialCommand
}
@Override
- public PartialCommand.WithDeps deserializeBody(TxnId txnId, Txn
txn, Timestamp executeAt, Status status, DataInputPlus in, Version version)
throws IOException
+ public PartialCommand.WithDeps deserializeBody(TxnId txnId, Txn
txn, Timestamp executeAt, Status status, DataInputPlus in,
AccordSerializerVersion version) throws IOException
{
- Deps deps = deserializeNullable(in, version.msg_version,
CommandSerializers.deps);
+ Deps deps = deserializeNullable(in, version.msgVersion,
CommandSerializers.deps);
return new AccordPartialCommand.WithDeps(txnId, txn,
executeAt, status, deps);
}
@Override
- public void serialize(PartialCommand.WithDeps command,
DataOutputPlus out, Version version) throws IOException
+ public void serialize(PartialCommand.WithDeps command,
DataOutputPlus out, AccordSerializerVersion version) throws IOException
{
super.serialize(command, out, version);
- serializeNullable(command.savedDeps(), out,
version.msg_version, CommandSerializers.deps);
+ serializeNullable(command.savedDeps(), out,
version.msgVersion, CommandSerializers.deps);
}
@Override
- public int serializedSize(PartialCommand.WithDeps command, Version
version)
+ public int serializedSize(PartialCommand.WithDeps command,
AccordSerializerVersion version)
{
int size = super.serializedSize(command, version) ;
- size += serializedSizeNullable(command.savedDeps(),
version.msg_version, CommandSerializers.deps);
+ size += serializedSizeNullable(command.savedDeps(),
version.msgVersion, CommandSerializers.deps);
return size;
}
@@ -228,44 +227,18 @@ public class AccordPartialCommand implements
PartialCommand
public static abstract class PartialCommandSerializer<T extends
PartialCommand>
{
- public enum Version
+ public void serialize(T command, DataOutputPlus out,
AccordSerializerVersion version) throws IOException
{
- VERSION_0(0, MessagingService.current_version);
- final byte version;
- final int msg_version;
-
- Version(int version, int msg_version)
- {
- this.version = (byte) version;
- this.msg_version = msg_version;
- }
-
- public static final Version current = VERSION_0;
-
- public static Version fromByte(byte b)
- {
- switch (b)
- {
- case 0:
- return VERSION_0;
- default:
- throw new IllegalArgumentException();
- }
- }
- }
-
- public void serialize(T command, DataOutputPlus out, Version version)
throws IOException
- {
- out.write(version.version);
- CommandSerializers.txnId.serialize(command.txnId(), out,
version.msg_version);
- CommandSerializers.status.serialize(command.status(), out,
version.msg_version);
- serializeNullable(command.txn(), out, version.msg_version,
CommandSerializers.txn);
- serializeNullable(command.executeAt(), out, version.msg_version,
CommandSerializers.timestamp);
+ AccordSerializerVersion.serializer.serialize(version, out);
+ CommandSerializers.txnId.serialize(command.txnId(), out,
version.msgVersion);
+ CommandSerializers.status.serialize(command.status(), out,
version.msgVersion);
+ serializeNullable(command.txn(), out, version.msgVersion,
CommandSerializers.txn);
+ serializeNullable(command.executeAt(), out, version.msgVersion,
CommandSerializers.timestamp);
}
public ByteBuffer serialize(T command)
{
- Version version = Version.current;
+ AccordSerializerVersion version = AccordSerializerVersion.CURRENT;
int size = serializedSize(command, version);
try (DataOutputBuffer out = new DataOutputBuffer(size))
{
@@ -278,24 +251,24 @@ public class AccordPartialCommand implements
PartialCommand
}
}
- public Version deserializeVersion(DataInputPlus in) throws IOException
+ public AccordSerializerVersion deserializeVersion(DataInputPlus in)
throws IOException
{
- return Version.fromByte(in.readByte());
+ return AccordSerializerVersion.serializer.deserialize(in);
}
// check for cached command first, otherwise deserialize
public T deserialize(AccordCommandStore commandStore, DataInputPlus
in) throws IOException
{
- Version version = deserializeVersion(in);
- TxnId txnId = CommandSerializers.txnId.deserialize(in,
version.msg_version);
+ AccordSerializerVersion version = deserializeVersion(in);
+ TxnId txnId = CommandSerializers.txnId.deserialize(in,
version.msgVersion);
AsyncContext context = commandStore.getContext();
T command = getCachedFull(txnId, context);
if (command != null)
return command;
- Status status = CommandSerializers.status.deserialize(in,
version.msg_version);
- Txn txn = deserializeNullable(in, version.msg_version,
CommandSerializers.txn);
- Timestamp executeAt = deserializeNullable(in, version.msg_version,
CommandSerializers.timestamp);
+ Status status = CommandSerializers.status.deserialize(in,
version.msgVersion);
+ Txn txn = deserializeNullable(in, version.msgVersion,
CommandSerializers.txn);
+ Timestamp executeAt = deserializeNullable(in, version.msgVersion,
CommandSerializers.timestamp);
T partial = deserializeBody(txnId, txn, executeAt, status, in,
version);
addToContext(partial, context);
return partial;
@@ -313,19 +286,19 @@ public class AccordPartialCommand implements
PartialCommand
}
}
- public int serializedSize(T command, Version version)
+ public int serializedSize(T command, AccordSerializerVersion version)
{
- int size = TypeSizes.sizeof(version.version);
+ int size =
Math.toIntExact(AccordSerializerVersion.serializer.serializedSize(version));
size += CommandSerializers.txnId.serializedSize();
- size += CommandSerializers.status.serializedSize(command.status(),
version.msg_version);
- size += serializedSizeNullable(command.txn(), version.msg_version,
CommandSerializers.txn);
- size += serializedSizeNullable(command.executeAt(),
version.msg_version, CommandSerializers.timestamp);
+ size += CommandSerializers.status.serializedSize(command.status(),
version.msgVersion);
+ size += serializedSizeNullable(command.txn(), version.msgVersion,
CommandSerializers.txn);
+ size += serializedSizeNullable(command.executeAt(),
version.msgVersion, CommandSerializers.timestamp);
return size;
}
public abstract T getCachedFull(TxnId txnId, AsyncContext context);
public abstract void addToContext(T command, AsyncContext context);
- public abstract T deserializeBody(TxnId txnId, Txn txn, Timestamp
executeAt, Status status, DataInputPlus in, Version version) throws IOException;
+ public abstract T deserializeBody(TxnId txnId, Txn txn, Timestamp
executeAt, Status status, DataInputPlus in, AccordSerializerVersion version)
throws IOException;
/**
* Determines if current modifications require updating command data
duplicated elsewhere
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordSerializerVersion.java
b/src/java/org/apache/cassandra/service/accord/AccordSerializerVersion.java
new file mode 100644
index 0000000000..cb6f8000f1
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/AccordSerializerVersion.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.MessageVersionProvider;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+
+public enum AccordSerializerVersion implements MessageVersionProvider
+{
+ // If MessagingService version bumps, this mapping does not need to be
updated; only updates needed are those that
+ // include accord serializer changes.
+ V1(1, MessagingService.VERSION_40);
+
+ public static final AccordSerializerVersion CURRENT = V1;
+ public static final Serializer serializer = new Serializer();
+
+ public final int version;
+ public final int msgVersion;
+
+ AccordSerializerVersion(int version, int msgVersion)
+ {
+ this.version = version;
+ this.msgVersion = msgVersion;
+ }
+
+ public static AccordSerializerVersion fromVersion(int version)
+ {
+ switch (version)
+ {
+ case 1:
+ return V1;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ public static AccordSerializerVersion fromMessageVersion(int version)
+ {
+ AccordSerializerVersion[] versions = values();
+ for (int i = versions.length - 1; i >= 0; i--)
+ {
+ AccordSerializerVersion v = versions[i];
+ // If network version bumped (12 to 13), the accord serializers
may not have been changed; use the largest
+ // version smaller than or equal to this version
+ if (v.msgVersion <= version)
+ return v;
+ }
+ throw new IllegalArgumentException("Attempted to use message version "
+ version + " which is smaller than " + versions[0] + " can handle (" +
versions[0].msgVersion + ")");
+ }
+
+ @Override
+ public int messageVersion()
+ {
+ return msgVersion;
+ }
+
+ public static class Serializer implements
IVersionedSerializer<AccordSerializerVersion>
+ {
+ @Override
+ public void serialize(AccordSerializerVersion t, DataOutputPlus out,
int version) throws IOException
+ {
+ serialize(t, out);
+ }
+
+ public void serialize(AccordSerializerVersion t, DataOutputPlus out)
throws IOException
+ {
+ out.writeUnsignedVInt(t.version);
+ }
+
+ @Override
+ public AccordSerializerVersion deserialize(DataInputPlus in, int
version) throws IOException
+ {
+ return deserialize(in);
+ }
+
+ public AccordSerializerVersion deserialize(DataInputPlus in) throws
IOException
+ {
+ return fromVersion(Math.toIntExact(in.readUnsignedVInt()));
+ }
+
+ @Override
+ public long serializedSize(AccordSerializerVersion t, int version)
+ {
+ return serializedSize(t);
+ }
+
+ public long serializedSize(AccordSerializerVersion t)
+ {
+ return TypeSizes.sizeofUnsignedVInt(t.version);
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
index 0a04a6e550..6b3f2b8f02 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
@@ -34,7 +34,6 @@ import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import accord.txn.Txn;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
@@ -106,24 +105,24 @@ public class AccordStateCache
private Node<?, ?> prev;
private Node<?, ?> next;
private int references = 0;
- private long lastQueriedSize = 0;
+ private long lastQueriedEstimatedSizeOnHeap = 0;
Node(V value)
{
this.value = value;
}
- long size()
+ long estimatedSizeOnHeap()
{
long result = EMPTY_SIZE + value.estimatedSizeOnHeap();
- lastQueriedSize = result;
+ lastQueriedEstimatedSizeOnHeap = result;
return result;
}
- long sizeDelta()
+ long estimatedSizeOnHeapDelta()
{
- long prevSize = lastQueriedSize;
- return size() - prevSize;
+ long prevSize = lastQueriedEstimatedSizeOnHeap;
+ return estimatedSizeOnHeap() - prevSize;
}
K key()
@@ -184,7 +183,7 @@ public class AccordStateCache
if (prev == null)
{
- Preconditions.checkState(head == node);
+ Preconditions.checkState(head == node, "previous is null but the
head isnt the provided node!");
head = next;
}
else
@@ -194,7 +193,7 @@ public class AccordStateCache
if (next == null)
{
- Preconditions.checkState(tail == node);
+ Preconditions.checkState(tail == node, "next is null but the tail
isnt the provided node!");
tail = prev;
}
else
@@ -224,15 +223,16 @@ public class AccordStateCache
private void updateSize(Node<?, ?> node)
{
- bytesCached += node.sizeDelta();
+ bytesCached += node.estimatedSizeOnHeapDelta();
}
// don't evict if there's an outstanding save future. If an item is
evicted then reloaded
// before it's mutation is applied, out of date info will be loaded
private boolean canEvict(Object key)
{
+ // getFuture only returns a future if it is running, so don't need to
check if its still running
Future<?> future = getFuture(saveFutures, key);
- return future == null || future.isDone();
+ return future == null;
}
private void maybeEvict()
@@ -255,7 +255,7 @@ public class AccordStateCache
logger.trace("Evicting {} {}",
evict.value.getClass().getSimpleName(), evict.key());
unlink(evict);
cache.remove(evict.key());
- bytesCached -= evict.size();
+ bytesCached -= evict.estimatedSizeOnHeap();
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
index 2cdb099394..cc824b3890 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
@@ -139,7 +139,7 @@ public class AsyncLoader
}
catch (Throwable t)
{
- logger.error(String.format("Exception loading %s for %s",
command.txnId(), callback), t);
+ logger.error("Exception loading {} for {}", command.txnId(),
callback, t);
throw t;
}
});
@@ -157,7 +157,7 @@ public class AsyncLoader
}
catch (Throwable t)
{
- logger.error(String.format("Exception loading %s for %s",
cfk.key(), callback), t);
+ logger.error("Exception loading {} for {}", cfk.key(),
callback, t);
throw t;
}
});
@@ -231,7 +231,7 @@ public class AsyncLoader
case FINISHED:
break;
default:
- throw new IllegalStateException();
+ throw new IllegalStateException("Unexpected state: " + state);
}
logger.trace("Exiting load for {} with state {}: {} {}", callback,
state, txnIds, keys);
diff --git
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index ca2ea8617d..5fa51425a5 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -96,7 +96,7 @@ public abstract class AsyncOperation<R> extends
AsyncPromise<R> implements Runna
@Override
public String toString()
{
- return "AsyncOperation{" + state + "}-0x" +
Integer.toHexString(System.identityHashCode(this));
+ return "AsyncOperation{" + state + "}-" + loggingId;
}
AsyncWriter createAsyncWriter(AccordCommandStore commandStore)
@@ -182,20 +182,26 @@ public abstract class AsyncOperation<R> extends
AsyncPromise<R> implements Runna
{
setLoggingIds();
logger.trace("Running {} with state {}", this, state);
- commandStore.checkInStoreThread();
- commandStore.setContext(context);
try
{
- runInternal();
- }
- catch (Throwable t)
- {
- logger.error(String.format("Operation %s failed", this), t);
- tryFailure(t);
+ commandStore.checkInStoreThread();
+ commandStore.setContext(context);
+ try
+ {
+ runInternal();
+ }
+ catch (Throwable t)
+ {
+ logger.error(String.format("Operation %s failed", this), t);
+ tryFailure(t);
+ }
+ finally
+ {
+ commandStore.unsetContext(context);
+ }
}
finally
{
- commandStore.unsetContext(context);
logger.trace("Exiting {}", this);
clearLoggingIds();
}
diff --git
a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
index 750aa93e12..04a63a1670 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
@@ -65,8 +65,8 @@ public class AsyncWriter
private State state = State.INITIALIZED;
protected Future<?> writeFuture;
private final AccordCommandStore commandStore;
- AccordStateCache.Instance<TxnId, AccordCommand> commandCache;
- AccordStateCache.Instance<PartitionKey, AccordCommandsForKey> cfkCache;
+ final AccordStateCache.Instance<TxnId, AccordCommand> commandCache;
+ final AccordStateCache.Instance<PartitionKey, AccordCommandsForKey>
cfkCache;
public AsyncWriter(AccordCommandStore commandStore)
{
@@ -97,7 +97,8 @@ public class AsyncWriter
continue;
}
- if (futures == null) futures = new ArrayList<>();
+ if (futures == null)
+ futures = new ArrayList<>();
K key = item.key();
Mutation mutation = mutationFunction.apply(item, timestamp);
if (logger.isTraceEnabled())
@@ -124,7 +125,8 @@ public class AsyncWriter
for (AccordState.WriteOnly<K, V> item : ctxGroup.writeOnly.values())
{
Preconditions.checkState(item.hasModifications());
- if (futures == null) futures = new ArrayList<>();
+ if (futures == null)
+ futures = new ArrayList<>();
Mutation mutation = mutationFunction.apply((V) item, timestamp);
Future<?> future = Stage.MUTATION.submit((Runnable)
mutation::apply);
future.addListener(() -> cache.purgeWriteOnly(item.key()),
commandStore.executor());
@@ -238,7 +240,7 @@ public class AsyncWriter
}
// There won't be a txn to denormalize against until the command has
been preaccepted
- if (command.status().hasBeen(Status.PreAccepted) &&
AccordPartialCommand.WithDeps.serializer.needsUpdate(command))
+ if (command.status().hasBeen(Status.PreAccepted) &&
AccordPartialCommand.WithDeps.serializer.needsUpdate(command) &&
!(command.txn() == null && command.status().isInvalidated()))
{
for (Key key : command.txn().keys())
{
@@ -306,7 +308,7 @@ public class AsyncWriter
case FINISHED:
break;
default:
- throw new IllegalStateException();
+ throw new IllegalStateException("Unexpected state: " +
state);
}
}
catch (IOException e)
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
index 26a27ed69e..7760c81e7d 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
@@ -104,7 +104,7 @@ public class AcceptSerializers
public void serialize(AcceptOk acceptOk, DataOutputPlus out, int
version) throws IOException
{
CommandSerializers.txnId.serialize(acceptOk.txnId, out, version);
- CommandSerializers.deps.serialize(acceptOk.deps, out, version);
+ NullableSerializer.serializeNullable(acceptOk.deps, out, version,
CommandSerializers.deps);
}
@@ -112,14 +112,14 @@ public class AcceptSerializers
public AcceptOk deserialize(DataInputPlus in, int version) throws
IOException
{
return new AcceptOk(CommandSerializers.txnId.deserialize(in,
version),
- CommandSerializers.deps.deserialize(in,
version));
+ NullableSerializer.deserializeNullable(in,
version, CommandSerializers.deps));
}
@Override
public long serializedSize(AcceptOk acceptOk, int version)
{
return CommandSerializers.txnId.serializedSize(acceptOk.txnId,
version)
- + CommandSerializers.deps.serializedSize(acceptOk.deps,
version);
+ + NullableSerializer.serializedSizeNullable(acceptOk.deps,
version, CommandSerializers.deps);
}
};
diff --git
a/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java
b/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java
index a57074db27..071fd25a3a 100644
---
a/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java
+++
b/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java
@@ -61,7 +61,8 @@ public class InterceptClasses implements BiFunction<String,
byte[], byte[]>
"|org[/.]apache[/.]cassandra[/.]db.streaming[/.].*" +
"|org[/.]apache[/.]cassandra[/.]distributed[/.]impl[/.]DirectStreamingConnectionFactory.*"
+
"|org[/.]apache[/.]cassandra[/.]db[/.]commitlog[/.].*" +
-
"|org[/.]apache[/.]cassandra[/.]service[/.]paxos[/.].*");
+
"|org[/.]apache[/.]cassandra[/.]service[/.]paxos[/.].*" +
+ "|accord[/.].*");
private static final Pattern GLOBAL_METHODS =
Pattern.compile("org[/.]apache[/.]cassandra[/.](?!simulator[/.]).*" +
"|org[/.]apache[/.]cassandra[/.]simulator[/.]test[/.].*" +
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java
b/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java
index 341ed63ed1..eb3df1734d 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java
@@ -376,6 +376,7 @@ public class SimulationRunner
catch (Throwable t)
{
logger().error("Failed on seed 0x{}",
Long.toHexString(seed), t);
+ throw t;
}
}
}
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/SimulatorUtils.java
b/test/simulator/main/org/apache/cassandra/simulator/SimulatorUtils.java
index 5be3384eeb..23fe5eaed6 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/SimulatorUtils.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/SimulatorUtils.java
@@ -40,9 +40,10 @@ public class SimulatorUtils
public static void dumpStackTraces(Logger logger)
{
Map<Thread, StackTraceElement[]> threadMap =
Thread.getAllStackTraces();
- threadMap.forEach((thread, ste) -> {
- logger.error("{}:\n {}", thread, Threads.prettyPrint(ste, false,
" ", "\n", ""));
- });
+ String prefix = " ";
+ String delimiter = "\n" + prefix;
+ threadMap.forEach((thread, ste) ->
+ logger.error("{}:\n{}", thread,
Threads.prettyPrint(ste, false, prefix, delimiter, "")));
FastThreadLocal.destroy();
}
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
index 503afafc38..1351117999 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.service.accord.async;
import java.util.Collections;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -66,20 +65,6 @@ public class AsyncLoaderTest
StorageService.instance.initServer();
}
- private static void load(AccordCommandStore commandStore, AsyncContext
context, Iterable<TxnId> txnIds, Iterable<PartitionKey> keys)
- {
- execute(commandStore, () ->
- {
- AsyncLoader loader = new AsyncLoader(commandStore, txnIds, keys);
- while (!loader.load(context, (o, t) -> Assert.assertNull(t)));
- });
- }
-
- private static void load(AccordCommandStore commandStore, AsyncContext
context, TxnId... txnIds)
- {
- load(commandStore, context, ImmutableList.copyOf(txnIds),
Collections.emptyList());
- }
-
/**
* Loading a cached resource shoudln't block
*/
@@ -118,7 +103,7 @@ public class AsyncLoaderTest
* Loading a cached resource should block
*/
@Test
- public void loadTest() throws Throwable
+ public void loadTest()
{
AtomicLong clock = new AtomicLong(0);
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
@@ -160,7 +145,7 @@ public class AsyncLoaderTest
* Test when some resources are cached and others need to be loaded
*/
@Test
- public void partialLoadTest() throws Throwable
+ public void partialLoadTest()
{
AtomicLong clock = new AtomicLong(0);
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
diff --git
a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
index 6ec036aa7b..4d788b6865 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
@@ -27,7 +27,6 @@ import org.junit.Test;
import accord.local.Command;
import accord.local.PartialCommand;
-import accord.local.PreLoadContext;
import accord.local.Status;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
@@ -77,7 +76,7 @@ public class AsyncWriterTest
}
@Test
- public void waitingOnDenormalization() throws Throwable
+ public void waitingOnDenormalization()
{
AtomicLong clock = new AtomicLong(0);
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]