This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 9327165d407dd4ea4c9af5b568ecd2a95474fa30 Author: David Capwell <[email protected]> AuthorDate: Fri Oct 21 16:29:28 2022 -0700 Command table now uses a local version added to the payload rather than rely on a version column, and fixed a few feedback related issues patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-17103 --- .build/include-accord.sh | 4 +- .../db/filter/AbstractClusteringIndexFilter.java | 1 - .../db/filter/ClusteringIndexSliceFilter.java | 1 - .../apache/cassandra/db/partitions/Partition.java | 2 - .../cassandra/io/LocalVersionedSerializer.java | 94 +++++++++++++ .../cassandra/io/MessageVersionProvider.java | 24 ++++ .../apache/cassandra/net/ResponseVerbHandler.java | 2 +- .../cassandra/service/accord/AccordCommand.java | 1 + .../service/accord/AccordCommandStore.java | 11 +- .../service/accord/AccordCommandsForKey.java | 2 +- .../cassandra/service/accord/AccordKeyspace.java | 145 ++++++++------------- .../service/accord/AccordMessageSink.java | 3 +- .../service/accord/AccordPartialCommand.java | 85 +++++------- .../service/accord/AccordSerializerVersion.java | 114 ++++++++++++++++ .../cassandra/service/accord/AccordStateCache.java | 24 ++-- .../service/accord/async/AsyncLoader.java | 6 +- .../service/accord/async/AsyncOperation.java | 26 ++-- .../service/accord/async/AsyncWriter.java | 14 +- .../accord/serializers/AcceptSerializers.java | 6 +- .../cassandra/simulator/asm/InterceptClasses.java | 3 +- .../cassandra/simulator/SimulationRunner.java | 1 + .../apache/cassandra/simulator/SimulatorUtils.java | 7 +- .../service/accord/async/AsyncLoaderTest.java | 19 +-- .../service/accord/async/AsyncWriterTest.java | 3 +- 24 files changed, 382 insertions(+), 216 deletions(-) diff --git a/.build/include-accord.sh b/.build/include-accord.sh index 7069e22b6f..0dc01bf2ba 100755 --- a/.build/include-accord.sh +++ b/.build/include-accord.sh @@ -24,8 +24,8 @@ set -o nounset bin="$(cd "$(dirname "$0")" > /dev/null; pwd)" -accord_repo='https://github.com/bdeggleston/cassandra-accord.git' -accord_branch='metadata-persistence' +accord_repo='https://github.com/apache/cassandra-accord.git' +accord_branch='trunk' accord_src="$bin/cassandra-accord" checkout() { diff --git a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java index e1792d17f8..ddcaaed812 100644 --- a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java +++ b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java @@ -18,7 +18,6 @@ package org.apache.cassandra.db.filter; import java.io.IOException; -import java.util.Objects; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java index 1f4b72c810..178c96b453 100644 --- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java @@ -20,7 +20,6 @@ package org.apache.cassandra.db.filter; import java.io.IOException; import java.util.List; import java.nio.ByteBuffer; -import java.util.Objects; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; diff --git a/src/java/org/apache/cassandra/db/partitions/Partition.java b/src/java/org/apache/cassandra/db/partitions/Partition.java index 1daf20cd00..601934a8e7 100644 --- a/src/java/org/apache/cassandra/db/partitions/Partition.java +++ b/src/java/org/apache/cassandra/db/partitions/Partition.java @@ -21,12 +21,10 @@ import java.util.NavigableSet; import javax.annotation.Nullable; -import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.service.accord.api.AccordKey; /** * In-memory representation of a Partition. diff --git a/src/java/org/apache/cassandra/io/LocalVersionedSerializer.java b/src/java/org/apache/cassandra/io/LocalVersionedSerializer.java new file mode 100644 index 0000000000..739f007846 --- /dev/null +++ b/src/java/org/apache/cassandra/io/LocalVersionedSerializer.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io; + +import java.io.IOException; +import java.util.Objects; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * Serializer that stores the version within the buffer. Normal usage of {@link IVersionedSerializer} is to rely on + * {@link org.apache.cassandra.net.MessagingService#current_version} and messaging version numbers, but that implies + * that a messaging version bump is required if a change is made to this field; for some cases the serializer isn't + * dealing with messages and instead are blobs stored in a table, for these cases it may be better to rely on a field + * specific versioning that gets stored along the data. + */ +public class LocalVersionedSerializer<I> +{ + private final MessageVersionProvider currentVersion; + private final IVersionedSerializer<MessageVersionProvider> versionSerializer; + private final IVersionedSerializer<I> serializer; + + public <V extends MessageVersionProvider> LocalVersionedSerializer(V currentVersion, + IVersionedSerializer<V> versionSerializer, + IVersionedSerializer<I> serializer) + { + // V is local to the constructor to validate at construction time things are fine, but don't want in the type + // sig of the class as it just gets verbose... + this.currentVersion = Objects.requireNonNull(currentVersion); + this.versionSerializer = (IVersionedSerializer<MessageVersionProvider>) Objects.requireNonNull(versionSerializer); + this.serializer = Objects.requireNonNull(serializer); + } + + public IVersionedSerializer<I> serializer() + { + return serializer; + } + + /** + * Serialize the specified type into the specified DataOutputStream instance. + * + * @param t type that needs to be serialized + * @param out DataOutput into which serialization needs to happen. + * @throws IOException if serialization fails + */ + public void serialize(I t, DataOutputPlus out) throws IOException + { + versionSerializer.serialize(currentVersion, out, currentVersion.messageVersion()); + serializer.serialize(t, out, currentVersion.messageVersion()); + } + + /** + * Deserialize into the specified DataInputStream instance. + * + * @param in DataInput from which deserialization needs to happen. + * @return the type that was deserialized + * @throws IOException if deserialization fails + */ + public I deserialize(DataInputPlus in) throws IOException + { + MessageVersionProvider version = versionSerializer.deserialize(in, currentVersion.messageVersion()); + return serializer.deserialize(in, version.messageVersion()); + } + + /** + * Calculate serialized size of object without actually serializing. + * + * @param t object to calculate serialized size + * @return serialized size of object t + */ + public long serializedSize(I t) + { + long size = versionSerializer.serializedSize(currentVersion, currentVersion.messageVersion()); + size += serializer.serializedSize(t, currentVersion.messageVersion()); + return size; + } +} diff --git a/src/java/org/apache/cassandra/io/MessageVersionProvider.java b/src/java/org/apache/cassandra/io/MessageVersionProvider.java new file mode 100644 index 0000000000..a6ad468281 --- /dev/null +++ b/src/java/org/apache/cassandra/io/MessageVersionProvider.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io; + +public interface MessageVersionProvider +{ + int messageVersion(); +} diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 3518888591..1cee468cd3 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -26,7 +26,7 @@ import org.apache.cassandra.tracing.Tracing; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; -public class ResponseVerbHandler implements IVerbHandler +class ResponseVerbHandler implements IVerbHandler { public static final ResponseVerbHandler instance = new ResponseVerbHandler(); diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java b/src/java/org/apache/cassandra/service/accord/AccordCommand.java index 3c373e60df..00feea4c4e 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java @@ -184,6 +184,7 @@ public class AccordCommand extends Command implements AccordState<TxnId> // ", txn=" + txn + // ", writes=" + writes + // ", result=" + result + + ", txn is null?=" + (txn.get() == null) + ", isGloballyPersistent=" + isGloballyPersistent + ", waitingOnCommit=" + waitingOnCommit + ", waitingOnApply=" + waitingOnApply + diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index f8659e0b3c..ce6c8b0e68 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -18,6 +18,7 @@ package org.apache.cassandra.service.accord; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -172,7 +173,8 @@ public class AccordCommandStore extends CommandStore public Command command(TxnId txnId) { AccordCommand command = getCommandInternal(txnId); - if (command.isEmpty()) command.initialize(); + if (command.isEmpty()) + command.initialize(); return command; } @@ -190,8 +192,9 @@ public class AccordCommandStore extends CommandStore private AccordCommandsForKey getCommandsForKeyInternal(Key key) { - Preconditions.checkState(currentCtx != null); - Preconditions.checkArgument(key instanceof PartitionKey); + Objects.requireNonNull(currentCtx, "current context"); + if (!(key instanceof PartitionKey)) + throw new IllegalArgumentException("Attempted to use non-PartitionKey; given " + key.getClass()); AccordCommandsForKey commandsForKey = currentCtx.commandsForKey.get((PartitionKey) key); if (commandsForKey == null) throw new IllegalArgumentException("No commandsForKey in context for key " + key); @@ -263,7 +266,7 @@ public class AccordCommandStore extends CommandStore } catch (ExecutionException e) { - throw new RuntimeException(e); + throw new RuntimeException(e.getCause()); } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java index 54ce46a3f9..d2d84899fc 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java @@ -408,7 +408,7 @@ public class AccordCommandsForKey extends CommandsForKey implements AccordState< // we use the executeAt time instead of the monotonic database timestamp to prevent uneven // ttl expiration in extreme cases, ie 1M+ writes/second to a key causing timestamps to overflow // into the next second on some keys and not others. - return (int) TimeUnit.MICROSECONDS.toSeconds(getTimestampMicros(lastExecutedTimestamp.get())); + return Math.toIntExact(TimeUnit.MICROSECONDS.toSeconds(getTimestampMicros(lastExecutedTimestamp.get()))); } public long timestampMicrosFor(Timestamp executeAt, boolean isForWriteTxn) diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index d2c3ef6928..d37f39a32c 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -33,8 +33,6 @@ import java.util.function.Supplier; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +44,8 @@ import accord.primitives.Ballot; import accord.primitives.Deps; import accord.primitives.Timestamp; import accord.primitives.TxnId; +import accord.txn.Txn; +import accord.txn.Writes; import accord.utils.DeterministicIdentitySet; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.cql3.UntypedResultSet; @@ -80,9 +80,9 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.transform.FilteredPartitions; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.LocalVersionedSerializer; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.Functions; import org.apache.cassandra.schema.KeyspaceMetadata; @@ -115,7 +115,6 @@ public class AccordKeyspace private static final Logger logger = LoggerFactory.getLogger(AccordKeyspace.class); public static final String COMMANDS = "commands"; - public static final String COMMAND_SERIES = "command_series"; public static final String COMMANDS_FOR_KEY = "commands_for_key"; private static final String TIMESTAMP_TUPLE = "tuple<bigint, bigint, int, bigint>"; @@ -134,20 +133,14 @@ public class AccordKeyspace + format("txn_id %s,", TIMESTAMP_TUPLE) + "status int," + "home_key blob," - + "home_key_version int," + "progress_key blob," - + "progress_key_version blob," + "is_globally_persistent boolean," - + "txn_version int," + "txn blob," + format("execute_at %s,", TIMESTAMP_TUPLE) + format("promised_ballot %s,", TIMESTAMP_TUPLE) + format("accepted_ballot %s,", TIMESTAMP_TUPLE) - + "dependencies_version int," + "dependencies blob," - + "writes_version int," + "writes blob," - + "result_version int," + "result blob," + format("waiting_on_commit map<%s, blob>,", TIMESTAMP_TUPLE) + format("waiting_on_apply map<%s, blob>,", TIMESTAMP_TUPLE) @@ -157,6 +150,20 @@ public class AccordKeyspace + "PRIMARY KEY((store_generation, store_index, txn_id))" + ')'); + private static class CommandsSerializers + { + static final LocalVersionedSerializer<AccordKey> ACCORD_KEY_SERIALIZER = localSerializer(AccordKey.serializer); + static final LocalVersionedSerializer<Txn> TXN_SERIALIZER = localSerializer(CommandSerializers.txn); + static final LocalVersionedSerializer<Deps> DEPS_SERIALIZER = localSerializer(CommandSerializers.deps); + static final LocalVersionedSerializer<Writes> WRITES_SERIALIZER = localSerializer(CommandSerializers.writes); + static final LocalVersionedSerializer<AccordData> RESULT_DATA_SERIALIZER = localSerializer(AccordData.serializer); + + private static <T> LocalVersionedSerializer<T> localSerializer(IVersionedSerializer<T> serializer) + { + return new LocalVersionedSerializer<>(AccordSerializerVersion.CURRENT, AccordSerializerVersion.serializer, serializer); + } + } + private static ColumnMetadata getColumn(TableMetadata metadata, String name) { ColumnMetadata column = metadata.getColumn(new ColumnIdentifier(name, true)); @@ -170,20 +177,14 @@ public class AccordKeyspace static final ClusteringComparator keyComparator = Commands.partitionKeyAsClusteringComparator(); static final ColumnMetadata status = getColumn(Commands, "status"); static final ColumnMetadata home_key = getColumn(Commands, "home_key"); - static final ColumnMetadata home_key_version = getColumn(Commands, "home_key_version"); static final ColumnMetadata progress_key = getColumn(Commands, "progress_key"); - static final ColumnMetadata progress_key_version = getColumn(Commands, "progress_key_version"); static final ColumnMetadata is_globally_persistent = getColumn(Commands, "is_globally_persistent"); - static final ColumnMetadata txn_version = getColumn(Commands, "txn_version"); static final ColumnMetadata txn = getColumn(Commands, "txn"); static final ColumnMetadata execute_at = getColumn(Commands, "execute_at"); static final ColumnMetadata promised_ballot = getColumn(Commands, "promised_ballot"); static final ColumnMetadata accepted_ballot = getColumn(Commands, "accepted_ballot"); - static final ColumnMetadata dependencies_version = getColumn(Commands, "dependencies_version"); static final ColumnMetadata dependencies = getColumn(Commands, "dependencies"); - static final ColumnMetadata writes_version = getColumn(Commands, "writes_version"); static final ColumnMetadata writes = getColumn(Commands, "writes"); - static final ColumnMetadata result_version = getColumn(Commands, "result_version"); static final ColumnMetadata result = getColumn(Commands, "result"); static final ColumnMetadata waiting_on_commit = getColumn(Commands, "waiting_on_commit"); static final ColumnMetadata waiting_on_apply = getColumn(Commands, "waiting_on_apply"); @@ -235,15 +236,21 @@ public class AccordKeyspace return commandsForKey.maxTimestamp.hasModifications() || commandsForKey.lastExecutedTimestamp.hasModifications() || commandsForKey.lastExecutedMicros.hasModifications() + || commandsForKey.lastWriteTimestamp.hasModifications() || commandsForKey.blindWitnessed.hasModifications(); } + private static boolean hasRegularChanges(AccordCommandsForKey commandsForKey) + { + return commandsForKey.uncommitted.map.hasModifications() + || commandsForKey.committedById.map.hasModifications() + || commandsForKey.committedByExecuteAt.map.hasModifications(); + } + static RegularAndStaticColumns columnsFor(AccordCommandsForKey commandsForKey) { boolean hasStaticChanges = hasStaticChanges(commandsForKey); - boolean hasRegularChanges = commandsForKey.uncommitted.map.hasAdditions() - || commandsForKey.committedById.map.hasAdditions() - || commandsForKey.committedByExecuteAt.map.hasAdditions(); + boolean hasRegularChanges = hasRegularChanges(commandsForKey); if (hasStaticChanges && hasRegularChanges) return all; @@ -252,7 +259,7 @@ public class AccordKeyspace else if (hasRegularChanges) return justRegular; else - throw new IllegalArgumentException(); + throw new IllegalArgumentException("CommandsForKey has_modifications=" + commandsForKey.hasModifications() + ", but no Static or Regular columns changed!"); } } @@ -275,41 +282,34 @@ public class AccordKeyspace return Tables.of(Commands, CommandsForKey); } - private static <T> ByteBuffer serialize(T obj, IVersionedSerializer<T> serializer, int version) throws IOException + private static <T> ByteBuffer serialize(T obj, LocalVersionedSerializer<T> serializer) throws IOException { - int size = (int) serializer.serializedSize(obj, version); + int size = (int) serializer.serializedSize(obj); try (DataOutputBuffer out = new DataOutputBuffer(size)) { - serializer.serialize(obj, out, version); - assert size == out.buffer().limit(); - return out.buffer(); + serializer.serialize(obj, out); + ByteBuffer bb = out.buffer(); + assert size == bb.limit() : String.format("Expected to write %d but wrote %d", size, bb.limit()); + return bb; } } - private static <T> ByteBuffer serializeOrNull(T obj, IVersionedSerializer<T> serializer, int version) throws IOException + private static <T> ByteBuffer serializeOrNull(T obj, LocalVersionedSerializer<T> serializer) throws IOException { - return obj != null ? serialize(obj, serializer, version) : EMPTY_BYTE_BUFFER; + return obj != null ? serialize(obj, serializer) : EMPTY_BYTE_BUFFER; } - private static <T> T deserialize(ByteBuffer bytes, IVersionedSerializer<T> serializer, int version) throws IOException + private static <T> T deserialize(ByteBuffer bytes, LocalVersionedSerializer<T> serializer) throws IOException { try (DataInputBuffer in = new DataInputBuffer(bytes, true)) { - return serializer.deserialize(in, version); + return serializer.deserialize(in); } } - private static <T> T deserializeOrNull(ByteBuffer bytes, IVersionedSerializer<T> serializer, int version) throws IOException + private static <T> T deserializeOrNull(ByteBuffer bytes, LocalVersionedSerializer<T> serializer) throws IOException { - return bytes != null && ! ByteBufferAccessor.instance.isEmpty(bytes) ? deserialize(bytes, serializer, version) : null; - } - - private static Map<ByteBuffer, ByteBuffer> serializeWaitingOn(Map<TxnId, ByteBuffer> waitingOn) - { - Map<ByteBuffer, ByteBuffer> result = Maps.newHashMapWithExpectedSize(waitingOn.size()); - for (Map.Entry<TxnId, ByteBuffer> entry : waitingOn.entrySet()) - result.put(serializeTimestamp(entry.getKey()), entry.getValue()); - return result; + return bytes != null && ! ByteBufferAccessor.instance.isEmpty(bytes) ? deserialize(bytes, serializer) : null; } private static NavigableMap<TxnId, ByteBuffer> deserializeWaitingOn(Map<ByteBuffer, ByteBuffer> serialized) @@ -345,16 +345,6 @@ public class AccordKeyspace return deserializeTimestampSet(row.getSet(name, BytesType.instance), TreeSet::new, TxnId::new); } - public static Set<ByteBuffer> serializeListeners(Set<ListenerProxy> listeners) - { - Set<ByteBuffer> result = Sets.newHashSetWithExpectedSize(listeners.size()); - for (ListenerProxy listener : listeners) - { - result.add(listener.identifier()); - } - return result; - } - private static DeterministicIdentitySet<ListenerProxy> deserializeListeners(CommandStore commandStore, Set<ByteBuffer> serialized) throws IOException { if (serialized == null || serialized.isEmpty()) @@ -433,32 +423,22 @@ public class AccordKeyspace Row.Builder builder = BTreeRow.unsortedBuilder(); builder.newRow(Clustering.EMPTY); int nowInSeconds = (int) TimeUnit.MICROSECONDS.toSeconds(timestampMicros); - int version = MessagingService.current_version; - ByteBuffer versionBytes = accessor.valueOf(version); + if (command.status.hasModifications()) builder.addCell(live(CommandsColumns.status, timestampMicros, accessor.valueOf(command.status.get().ordinal()))); if (command.homeKey.hasModifications()) - { - builder.addCell(live(CommandsColumns.home_key_version, timestampMicros, versionBytes)); - builder.addCell(live(CommandsColumns.home_key, timestampMicros, serializeOrNull((AccordKey) command.homeKey.get(), AccordKey.serializer, version))); - } + builder.addCell(live(CommandsColumns.home_key, timestampMicros, serializeOrNull((AccordKey) command.homeKey.get(), CommandsSerializers.ACCORD_KEY_SERIALIZER))); if (command.progressKey.hasModifications()) - { - builder.addCell(live(CommandsColumns.progress_key_version, timestampMicros, versionBytes)); - builder.addCell(live(CommandsColumns.progress_key, timestampMicros, serializeOrNull((AccordKey) command.progressKey.get(), AccordKey.serializer, version))); - } + builder.addCell(live(CommandsColumns.progress_key, timestampMicros, serializeOrNull((AccordKey) command.progressKey.get(), CommandsSerializers.ACCORD_KEY_SERIALIZER))); if (command.isGloballyPersistent.hasModifications()) builder.addCell(live(CommandsColumns.is_globally_persistent, timestampMicros, accessor.valueOf(command.isGloballyPersistent.get()))); if (command.txn.hasModifications()) - { - builder.addCell(live(CommandsColumns.txn_version, timestampMicros, versionBytes)); - builder.addCell(live(CommandsColumns.txn, timestampMicros, serializeOrNull(command.txn.get(), CommandSerializers.txn, version))); - } + builder.addCell(live(CommandsColumns.txn, timestampMicros, serializeOrNull(command.txn.get(), CommandsSerializers.TXN_SERIALIZER))); if (command.executeAt.hasModifications()) builder.addCell(live(CommandsColumns.execute_at, timestampMicros, serializeTimestamp(command.executeAt.get()))); @@ -470,22 +450,13 @@ public class AccordKeyspace builder.addCell(live(CommandsColumns.accepted_ballot, timestampMicros, serializeTimestamp(command.accepted.get()))); if (command.deps.hasModifications()) - { - builder.addCell(live(CommandsColumns.dependencies_version, timestampMicros, versionBytes)); - builder.addCell(live(CommandsColumns.dependencies, timestampMicros, serialize(command.deps.get(), CommandSerializers.deps, version))); - } + builder.addCell(live(CommandsColumns.dependencies, timestampMicros, serialize(command.deps.get(), CommandsSerializers.DEPS_SERIALIZER))); if (command.writes.hasModifications()) - { - builder.addCell(live(CommandsColumns.writes_version, timestampMicros, versionBytes)); - builder.addCell(live(CommandsColumns.writes, timestampMicros, serialize(command.writes.get(), CommandSerializers.writes, version))); - } + builder.addCell(live(CommandsColumns.writes, timestampMicros, serialize(command.writes.get(), CommandsSerializers.WRITES_SERIALIZER))); if (command.result.hasModifications()) - { - builder.addCell(live(CommandsColumns.result_version, timestampMicros, versionBytes)); - builder.addCell(live(CommandsColumns.result, timestampMicros, serialize((AccordData) command.result.get(), AccordData.serializer, version))); - } + builder.addCell(live(CommandsColumns.result, timestampMicros, serialize((AccordData) command.result.get(), CommandsSerializers.RESULT_DATA_SERIALIZER))); if (command.waitingOnCommit.hasModifications()) { @@ -535,7 +506,6 @@ public class AccordKeyspace private static ByteBuffer serializeKey(PartitionKey key) { - UUIDSerializer.instance.serialize(key.tableId().asUUID()); return TupleType.buildValue(new ByteBuffer[]{UUIDSerializer.instance.serialize(key.tableId().asUUID()), key.partitionKey().getKey()}); } @@ -570,12 +540,12 @@ public class AccordKeyspace return command; } - private static <T> T deserializeWithVersionOr(UntypedResultSet.Row row, String dataColumn, String versionColumn, IVersionedSerializer<T> serializer, Supplier<T> defaultSupplier) throws IOException + private static <T> T deserializeWithVersionOr(UntypedResultSet.Row row, String dataColumn, LocalVersionedSerializer<T> serializer, Supplier<T> defaultSupplier) throws IOException { - if (!row.has(versionColumn)) + if (!row.has(dataColumn)) return defaultSupplier.get(); - return deserialize(row.getBlob(dataColumn), serializer, row.getInt(versionColumn)); + return deserialize(row.getBlob(dataColumn), serializer); } public static UntypedResultSet loadCommandRow(CommandStore commandStore, TxnId txnId) @@ -598,11 +568,6 @@ public class AccordKeyspace AccordCommandStore commandStore = command.commandStore(); commandStore.checkNotInStoreThread(); - String cql = "SELECT * FROM %s.%s " + - "WHERE store_generation=? " + - "AND store_index=? " + - "AND txn_id=(?, ?, ?, ?)"; - UntypedResultSet result = loadCommandRow(commandStore, command.txnId()); if (result.isEmpty()) @@ -616,16 +581,16 @@ public class AccordKeyspace UntypedResultSet.Row row = result.one(); Preconditions.checkState(deserializeTimestampOrNull(row, "txn_id", TxnId::new).equals(txnId)); command.status.load(Status.values()[row.getInt("status")]); - command.homeKey.load(deserializeOrNull(row.getBlob("home_key"), AccordKey.serializer, row.getInt("home_key_version"))); - command.progressKey.load(deserializeOrNull(row.getBlob("progress_key"), AccordKey.serializer, row.getInt("progress_key_version"))); + command.homeKey.load(deserializeOrNull(row.getBlob("home_key"), CommandsSerializers.ACCORD_KEY_SERIALIZER)); + command.progressKey.load(deserializeOrNull(row.getBlob("progress_key"), CommandsSerializers.ACCORD_KEY_SERIALIZER)); command.isGloballyPersistent.load(row.getBoolean("is_globally_persistent")); - command.txn.load(deserializeOrNull(row.getBlob("txn"), CommandSerializers.txn, row.getInt("txn_version"))); + command.txn.load(deserializeOrNull(row.getBlob("txn"), CommandsSerializers.TXN_SERIALIZER)); command.executeAt.load(deserializeTimestampOrNull(row, "execute_at", Timestamp::new)); command.promised.load(deserializeTimestampOrNull(row, "promised_ballot", Ballot::new)); command.accepted.load(deserializeTimestampOrNull(row, "accepted_ballot", Ballot::new)); - command.deps.load(deserializeWithVersionOr(row, "dependencies", "dependencies_version", CommandSerializers.deps, () -> Deps.NONE)); - command.writes.load(deserializeWithVersionOr(row, "writes", "writes_version", CommandSerializers.writes, () -> null)); - command.result.load(deserializeWithVersionOr(row, "result", "result_version", AccordData.serializer, () -> null)); + command.deps.load(deserializeWithVersionOr(row, "dependencies", CommandsSerializers.DEPS_SERIALIZER, () -> Deps.NONE)); + command.writes.load(deserializeWithVersionOr(row, "writes", CommandsSerializers.WRITES_SERIALIZER, () -> null)); + command.result.load(deserializeWithVersionOr(row, "result", CommandsSerializers.RESULT_DATA_SERIALIZER, () -> null)); command.waitingOnCommit.load(deserializeWaitingOn(row, "waiting_on_commit")); command.blockingCommitOn.load(deserializeBlocking(row, "blocking_commit_on")); command.waitingOnApply.load(deserializeWaitingOn(row, "waiting_on_apply")); diff --git a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java index b723b27e6e..32c136b1b7 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java +++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java @@ -20,6 +20,7 @@ package org.apache.cassandra.service.accord; import java.util.EnumMap; import java.util.Map; +import java.util.Objects; import com.google.common.base.Preconditions; @@ -89,7 +90,7 @@ public class AccordMessageSink implements MessageSink public void send(Node.Id to, Request request) { Verb verb = getVerb(request.type()); - Preconditions.checkArgument(verb != null); + Objects.requireNonNull(verb, "verb"); Message<Request> message = Message.out(verb, request); InetAddressAndPort endpoint = getEndpoint(to); logger.debug("Sending {} {} to {}", verb, message.payload, endpoint); diff --git a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java index 1419a7595f..c2c3010e43 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java +++ b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java @@ -32,12 +32,10 @@ import accord.primitives.Deps; import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.txn.Txn; -import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.accord.async.AsyncContext; import org.apache.cassandra.service.accord.serializers.CommandSerializers; @@ -62,7 +60,7 @@ public class AccordPartialCommand implements PartialCommand } @Override - public PartialCommand deserializeBody(TxnId txnId, Txn txn, Timestamp executeAt, Status status, DataInputPlus in, Version version) throws IOException + public PartialCommand deserializeBody(TxnId txnId, Txn txn, Timestamp executeAt, Status status, DataInputPlus in, AccordSerializerVersion version) throws IOException { return new AccordPartialCommand(txnId, txn, executeAt, status); } @@ -121,7 +119,8 @@ public class AccordPartialCommand implements PartialCommand public void forEachRemovedListener(Consumer<Listener> consumer) { - removedListeners.forEach(consumer); + if (removedListeners != null) + removedListeners.forEach(consumer); } @Override @@ -168,24 +167,24 @@ public class AccordPartialCommand implements PartialCommand } @Override - public PartialCommand.WithDeps deserializeBody(TxnId txnId, Txn txn, Timestamp executeAt, Status status, DataInputPlus in, Version version) throws IOException + public PartialCommand.WithDeps deserializeBody(TxnId txnId, Txn txn, Timestamp executeAt, Status status, DataInputPlus in, AccordSerializerVersion version) throws IOException { - Deps deps = deserializeNullable(in, version.msg_version, CommandSerializers.deps); + Deps deps = deserializeNullable(in, version.msgVersion, CommandSerializers.deps); return new AccordPartialCommand.WithDeps(txnId, txn, executeAt, status, deps); } @Override - public void serialize(PartialCommand.WithDeps command, DataOutputPlus out, Version version) throws IOException + public void serialize(PartialCommand.WithDeps command, DataOutputPlus out, AccordSerializerVersion version) throws IOException { super.serialize(command, out, version); - serializeNullable(command.savedDeps(), out, version.msg_version, CommandSerializers.deps); + serializeNullable(command.savedDeps(), out, version.msgVersion, CommandSerializers.deps); } @Override - public int serializedSize(PartialCommand.WithDeps command, Version version) + public int serializedSize(PartialCommand.WithDeps command, AccordSerializerVersion version) { int size = super.serializedSize(command, version) ; - size += serializedSizeNullable(command.savedDeps(), version.msg_version, CommandSerializers.deps); + size += serializedSizeNullable(command.savedDeps(), version.msgVersion, CommandSerializers.deps); return size; } @@ -228,44 +227,18 @@ public class AccordPartialCommand implements PartialCommand public static abstract class PartialCommandSerializer<T extends PartialCommand> { - public enum Version + public void serialize(T command, DataOutputPlus out, AccordSerializerVersion version) throws IOException { - VERSION_0(0, MessagingService.current_version); - final byte version; - final int msg_version; - - Version(int version, int msg_version) - { - this.version = (byte) version; - this.msg_version = msg_version; - } - - public static final Version current = VERSION_0; - - public static Version fromByte(byte b) - { - switch (b) - { - case 0: - return VERSION_0; - default: - throw new IllegalArgumentException(); - } - } - } - - public void serialize(T command, DataOutputPlus out, Version version) throws IOException - { - out.write(version.version); - CommandSerializers.txnId.serialize(command.txnId(), out, version.msg_version); - CommandSerializers.status.serialize(command.status(), out, version.msg_version); - serializeNullable(command.txn(), out, version.msg_version, CommandSerializers.txn); - serializeNullable(command.executeAt(), out, version.msg_version, CommandSerializers.timestamp); + AccordSerializerVersion.serializer.serialize(version, out); + CommandSerializers.txnId.serialize(command.txnId(), out, version.msgVersion); + CommandSerializers.status.serialize(command.status(), out, version.msgVersion); + serializeNullable(command.txn(), out, version.msgVersion, CommandSerializers.txn); + serializeNullable(command.executeAt(), out, version.msgVersion, CommandSerializers.timestamp); } public ByteBuffer serialize(T command) { - Version version = Version.current; + AccordSerializerVersion version = AccordSerializerVersion.CURRENT; int size = serializedSize(command, version); try (DataOutputBuffer out = new DataOutputBuffer(size)) { @@ -278,24 +251,24 @@ public class AccordPartialCommand implements PartialCommand } } - public Version deserializeVersion(DataInputPlus in) throws IOException + public AccordSerializerVersion deserializeVersion(DataInputPlus in) throws IOException { - return Version.fromByte(in.readByte()); + return AccordSerializerVersion.serializer.deserialize(in); } // check for cached command first, otherwise deserialize public T deserialize(AccordCommandStore commandStore, DataInputPlus in) throws IOException { - Version version = deserializeVersion(in); - TxnId txnId = CommandSerializers.txnId.deserialize(in, version.msg_version); + AccordSerializerVersion version = deserializeVersion(in); + TxnId txnId = CommandSerializers.txnId.deserialize(in, version.msgVersion); AsyncContext context = commandStore.getContext(); T command = getCachedFull(txnId, context); if (command != null) return command; - Status status = CommandSerializers.status.deserialize(in, version.msg_version); - Txn txn = deserializeNullable(in, version.msg_version, CommandSerializers.txn); - Timestamp executeAt = deserializeNullable(in, version.msg_version, CommandSerializers.timestamp); + Status status = CommandSerializers.status.deserialize(in, version.msgVersion); + Txn txn = deserializeNullable(in, version.msgVersion, CommandSerializers.txn); + Timestamp executeAt = deserializeNullable(in, version.msgVersion, CommandSerializers.timestamp); T partial = deserializeBody(txnId, txn, executeAt, status, in, version); addToContext(partial, context); return partial; @@ -313,19 +286,19 @@ public class AccordPartialCommand implements PartialCommand } } - public int serializedSize(T command, Version version) + public int serializedSize(T command, AccordSerializerVersion version) { - int size = TypeSizes.sizeof(version.version); + int size = Math.toIntExact(AccordSerializerVersion.serializer.serializedSize(version)); size += CommandSerializers.txnId.serializedSize(); - size += CommandSerializers.status.serializedSize(command.status(), version.msg_version); - size += serializedSizeNullable(command.txn(), version.msg_version, CommandSerializers.txn); - size += serializedSizeNullable(command.executeAt(), version.msg_version, CommandSerializers.timestamp); + size += CommandSerializers.status.serializedSize(command.status(), version.msgVersion); + size += serializedSizeNullable(command.txn(), version.msgVersion, CommandSerializers.txn); + size += serializedSizeNullable(command.executeAt(), version.msgVersion, CommandSerializers.timestamp); return size; } public abstract T getCachedFull(TxnId txnId, AsyncContext context); public abstract void addToContext(T command, AsyncContext context); - public abstract T deserializeBody(TxnId txnId, Txn txn, Timestamp executeAt, Status status, DataInputPlus in, Version version) throws IOException; + public abstract T deserializeBody(TxnId txnId, Txn txn, Timestamp executeAt, Status status, DataInputPlus in, AccordSerializerVersion version) throws IOException; /** * Determines if current modifications require updating command data duplicated elsewhere diff --git a/src/java/org/apache/cassandra/service/accord/AccordSerializerVersion.java b/src/java/org/apache/cassandra/service/accord/AccordSerializerVersion.java new file mode 100644 index 0000000000..cb6f8000f1 --- /dev/null +++ b/src/java/org/apache/cassandra/service/accord/AccordSerializerVersion.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.io.IOException; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.MessageVersionProvider; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; + +public enum AccordSerializerVersion implements MessageVersionProvider +{ + // If MessagingService version bumps, this mapping does not need to be updated; only updates needed are those that + // include accord serializer changes. + V1(1, MessagingService.VERSION_40); + + public static final AccordSerializerVersion CURRENT = V1; + public static final Serializer serializer = new Serializer(); + + public final int version; + public final int msgVersion; + + AccordSerializerVersion(int version, int msgVersion) + { + this.version = version; + this.msgVersion = msgVersion; + } + + public static AccordSerializerVersion fromVersion(int version) + { + switch (version) + { + case 1: + return V1; + default: + throw new IllegalArgumentException(); + } + } + + public static AccordSerializerVersion fromMessageVersion(int version) + { + AccordSerializerVersion[] versions = values(); + for (int i = versions.length - 1; i >= 0; i--) + { + AccordSerializerVersion v = versions[i]; + // If network version bumped (12 to 13), the accord serializers may not have been changed; use the largest + // version smaller than or equal to this version + if (v.msgVersion <= version) + return v; + } + throw new IllegalArgumentException("Attempted to use message version " + version + " which is smaller than " + versions[0] + " can handle (" + versions[0].msgVersion + ")"); + } + + @Override + public int messageVersion() + { + return msgVersion; + } + + public static class Serializer implements IVersionedSerializer<AccordSerializerVersion> + { + @Override + public void serialize(AccordSerializerVersion t, DataOutputPlus out, int version) throws IOException + { + serialize(t, out); + } + + public void serialize(AccordSerializerVersion t, DataOutputPlus out) throws IOException + { + out.writeUnsignedVInt(t.version); + } + + @Override + public AccordSerializerVersion deserialize(DataInputPlus in, int version) throws IOException + { + return deserialize(in); + } + + public AccordSerializerVersion deserialize(DataInputPlus in) throws IOException + { + return fromVersion(Math.toIntExact(in.readUnsignedVInt())); + } + + @Override + public long serializedSize(AccordSerializerVersion t, int version) + { + return serializedSize(t); + } + + public long serializedSize(AccordSerializerVersion t) + { + return TypeSizes.sizeofUnsignedVInt(t.version); + } + } +} diff --git a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java index 0a04a6e550..6b3f2b8f02 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java +++ b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java @@ -34,7 +34,6 @@ import com.google.common.collect.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import accord.txn.Txn; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.FutureCombiner; @@ -106,24 +105,24 @@ public class AccordStateCache private Node<?, ?> prev; private Node<?, ?> next; private int references = 0; - private long lastQueriedSize = 0; + private long lastQueriedEstimatedSizeOnHeap = 0; Node(V value) { this.value = value; } - long size() + long estimatedSizeOnHeap() { long result = EMPTY_SIZE + value.estimatedSizeOnHeap(); - lastQueriedSize = result; + lastQueriedEstimatedSizeOnHeap = result; return result; } - long sizeDelta() + long estimatedSizeOnHeapDelta() { - long prevSize = lastQueriedSize; - return size() - prevSize; + long prevSize = lastQueriedEstimatedSizeOnHeap; + return estimatedSizeOnHeap() - prevSize; } K key() @@ -184,7 +183,7 @@ public class AccordStateCache if (prev == null) { - Preconditions.checkState(head == node); + Preconditions.checkState(head == node, "previous is null but the head isnt the provided node!"); head = next; } else @@ -194,7 +193,7 @@ public class AccordStateCache if (next == null) { - Preconditions.checkState(tail == node); + Preconditions.checkState(tail == node, "next is null but the tail isnt the provided node!"); tail = prev; } else @@ -224,15 +223,16 @@ public class AccordStateCache private void updateSize(Node<?, ?> node) { - bytesCached += node.sizeDelta(); + bytesCached += node.estimatedSizeOnHeapDelta(); } // don't evict if there's an outstanding save future. If an item is evicted then reloaded // before it's mutation is applied, out of date info will be loaded private boolean canEvict(Object key) { + // getFuture only returns a future if it is running, so don't need to check if its still running Future<?> future = getFuture(saveFutures, key); - return future == null || future.isDone(); + return future == null; } private void maybeEvict() @@ -255,7 +255,7 @@ public class AccordStateCache logger.trace("Evicting {} {}", evict.value.getClass().getSimpleName(), evict.key()); unlink(evict); cache.remove(evict.key()); - bytesCached -= evict.size(); + bytesCached -= evict.estimatedSizeOnHeap(); } } diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java index 2cdb099394..cc824b3890 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java @@ -139,7 +139,7 @@ public class AsyncLoader } catch (Throwable t) { - logger.error(String.format("Exception loading %s for %s", command.txnId(), callback), t); + logger.error("Exception loading {} for {}", command.txnId(), callback, t); throw t; } }); @@ -157,7 +157,7 @@ public class AsyncLoader } catch (Throwable t) { - logger.error(String.format("Exception loading %s for %s", cfk.key(), callback), t); + logger.error("Exception loading {} for {}", cfk.key(), callback, t); throw t; } }); @@ -231,7 +231,7 @@ public class AsyncLoader case FINISHED: break; default: - throw new IllegalStateException(); + throw new IllegalStateException("Unexpected state: " + state); } logger.trace("Exiting load for {} with state {}: {} {}", callback, state, txnIds, keys); diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java index ca2ea8617d..5fa51425a5 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java @@ -96,7 +96,7 @@ public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runna @Override public String toString() { - return "AsyncOperation{" + state + "}-0x" + Integer.toHexString(System.identityHashCode(this)); + return "AsyncOperation{" + state + "}-" + loggingId; } AsyncWriter createAsyncWriter(AccordCommandStore commandStore) @@ -182,20 +182,26 @@ public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runna { setLoggingIds(); logger.trace("Running {} with state {}", this, state); - commandStore.checkInStoreThread(); - commandStore.setContext(context); try { - runInternal(); - } - catch (Throwable t) - { - logger.error(String.format("Operation %s failed", this), t); - tryFailure(t); + commandStore.checkInStoreThread(); + commandStore.setContext(context); + try + { + runInternal(); + } + catch (Throwable t) + { + logger.error(String.format("Operation %s failed", this), t); + tryFailure(t); + } + finally + { + commandStore.unsetContext(context); + } } finally { - commandStore.unsetContext(context); logger.trace("Exiting {}", this); clearLoggingIds(); } diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java index 750aa93e12..04a63a1670 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java @@ -65,8 +65,8 @@ public class AsyncWriter private State state = State.INITIALIZED; protected Future<?> writeFuture; private final AccordCommandStore commandStore; - AccordStateCache.Instance<TxnId, AccordCommand> commandCache; - AccordStateCache.Instance<PartitionKey, AccordCommandsForKey> cfkCache; + final AccordStateCache.Instance<TxnId, AccordCommand> commandCache; + final AccordStateCache.Instance<PartitionKey, AccordCommandsForKey> cfkCache; public AsyncWriter(AccordCommandStore commandStore) { @@ -97,7 +97,8 @@ public class AsyncWriter continue; } - if (futures == null) futures = new ArrayList<>(); + if (futures == null) + futures = new ArrayList<>(); K key = item.key(); Mutation mutation = mutationFunction.apply(item, timestamp); if (logger.isTraceEnabled()) @@ -124,7 +125,8 @@ public class AsyncWriter for (AccordState.WriteOnly<K, V> item : ctxGroup.writeOnly.values()) { Preconditions.checkState(item.hasModifications()); - if (futures == null) futures = new ArrayList<>(); + if (futures == null) + futures = new ArrayList<>(); Mutation mutation = mutationFunction.apply((V) item, timestamp); Future<?> future = Stage.MUTATION.submit((Runnable) mutation::apply); future.addListener(() -> cache.purgeWriteOnly(item.key()), commandStore.executor()); @@ -238,7 +240,7 @@ public class AsyncWriter } // There won't be a txn to denormalize against until the command has been preaccepted - if (command.status().hasBeen(Status.PreAccepted) && AccordPartialCommand.WithDeps.serializer.needsUpdate(command)) + if (command.status().hasBeen(Status.PreAccepted) && AccordPartialCommand.WithDeps.serializer.needsUpdate(command) && !(command.txn() == null && command.status().isInvalidated())) { for (Key key : command.txn().keys()) { @@ -306,7 +308,7 @@ public class AsyncWriter case FINISHED: break; default: - throw new IllegalStateException(); + throw new IllegalStateException("Unexpected state: " + state); } } catch (IOException e) diff --git a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java index 26a27ed69e..7760c81e7d 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java @@ -104,7 +104,7 @@ public class AcceptSerializers public void serialize(AcceptOk acceptOk, DataOutputPlus out, int version) throws IOException { CommandSerializers.txnId.serialize(acceptOk.txnId, out, version); - CommandSerializers.deps.serialize(acceptOk.deps, out, version); + NullableSerializer.serializeNullable(acceptOk.deps, out, version, CommandSerializers.deps); } @@ -112,14 +112,14 @@ public class AcceptSerializers public AcceptOk deserialize(DataInputPlus in, int version) throws IOException { return new AcceptOk(CommandSerializers.txnId.deserialize(in, version), - CommandSerializers.deps.deserialize(in, version)); + NullableSerializer.deserializeNullable(in, version, CommandSerializers.deps)); } @Override public long serializedSize(AcceptOk acceptOk, int version) { return CommandSerializers.txnId.serializedSize(acceptOk.txnId, version) - + CommandSerializers.deps.serializedSize(acceptOk.deps, version); + + NullableSerializer.serializedSizeNullable(acceptOk.deps, version, CommandSerializers.deps); } }; diff --git a/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java b/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java index 473cc27032..17a0857198 100644 --- a/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java +++ b/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java @@ -61,7 +61,8 @@ public class InterceptClasses implements BiFunction<String, byte[], byte[]> "|org[/.]apache[/.]cassandra[/.]db.streaming[/.].*" + "|org[/.]apache[/.]cassandra[/.]distributed[/.]impl[/.]DirectStreamingConnectionFactory.*" + "|org[/.]apache[/.]cassandra[/.]db[/.]commitlog[/.].*" + - "|org[/.]apache[/.]cassandra[/.]service[/.]paxos[/.].*"); + "|org[/.]apache[/.]cassandra[/.]service[/.]paxos[/.].*" + + "|accord[/.].*"); private static final Pattern GLOBAL_METHODS = Pattern.compile("org[/.]apache[/.]cassandra[/.](?!simulator[/.]).*" + "|org[/.]apache[/.]cassandra[/.]simulator[/.]test[/.].*" + diff --git a/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java b/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java index 341ed63ed1..eb3df1734d 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java +++ b/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java @@ -376,6 +376,7 @@ public class SimulationRunner catch (Throwable t) { logger().error("Failed on seed 0x{}", Long.toHexString(seed), t); + throw t; } } } diff --git a/test/simulator/main/org/apache/cassandra/simulator/SimulatorUtils.java b/test/simulator/main/org/apache/cassandra/simulator/SimulatorUtils.java index 5be3384eeb..23fe5eaed6 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/SimulatorUtils.java +++ b/test/simulator/main/org/apache/cassandra/simulator/SimulatorUtils.java @@ -40,9 +40,10 @@ public class SimulatorUtils public static void dumpStackTraces(Logger logger) { Map<Thread, StackTraceElement[]> threadMap = Thread.getAllStackTraces(); - threadMap.forEach((thread, ste) -> { - logger.error("{}:\n {}", thread, Threads.prettyPrint(ste, false, " ", "\n", "")); - }); + String prefix = " "; + String delimiter = "\n" + prefix; + threadMap.forEach((thread, ste) -> + logger.error("{}:\n{}", thread, Threads.prettyPrint(ste, false, prefix, delimiter, ""))); FastThreadLocal.destroy(); } } diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java index 503afafc38..1351117999 100644 --- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java +++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.service.accord.async; import java.util.Collections; -import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -66,20 +65,6 @@ public class AsyncLoaderTest StorageService.instance.initServer(); } - private static void load(AccordCommandStore commandStore, AsyncContext context, Iterable<TxnId> txnIds, Iterable<PartitionKey> keys) - { - execute(commandStore, () -> - { - AsyncLoader loader = new AsyncLoader(commandStore, txnIds, keys); - while (!loader.load(context, (o, t) -> Assert.assertNull(t))); - }); - } - - private static void load(AccordCommandStore commandStore, AsyncContext context, TxnId... txnIds) - { - load(commandStore, context, ImmutableList.copyOf(txnIds), Collections.emptyList()); - } - /** * Loading a cached resource shoudln't block */ @@ -118,7 +103,7 @@ public class AsyncLoaderTest * Loading a cached resource should block */ @Test - public void loadTest() throws Throwable + public void loadTest() { AtomicLong clock = new AtomicLong(0); AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); @@ -160,7 +145,7 @@ public class AsyncLoaderTest * Test when some resources are cached and others need to be loaded */ @Test - public void partialLoadTest() throws Throwable + public void partialLoadTest() { AtomicLong clock = new AtomicLong(0); AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java index 6ec036aa7b..4d788b6865 100644 --- a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java +++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java @@ -27,7 +27,6 @@ import org.junit.Test; import accord.local.Command; import accord.local.PartialCommand; -import accord.local.PreLoadContext; import accord.local.Status; import accord.primitives.Timestamp; import accord.primitives.TxnId; @@ -77,7 +76,7 @@ public class AsyncWriterTest } @Test - public void waitingOnDenormalization() throws Throwable + public void waitingOnDenormalization() { AtomicLong clock = new AtomicLong(0); AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
