This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new d4c308a8a6 perf improvements d4c308a8a6 is described below commit d4c308a8a68ff892e4bbebf673b3d924a5313e52 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Wed Feb 28 15:50:04 2024 +0000 perf improvements --- modules/accord | 2 +- .../org/apache/cassandra/config/AccordSpec.java | 4 +- src/java/org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 22 +++ .../db/compaction/CompactionIterator.java | 6 +- src/java/org/apache/cassandra/net/Verb.java | 32 +-- .../service/accord/AccordCachingState.java | 4 +- .../service/accord/AccordCommandStore.java | 20 +- .../service/accord/AccordCommandStores.java | 8 +- .../service/accord/AccordConfiguration.java | 1 + .../cassandra/service/accord/AccordJournal.java | 15 ++ .../cassandra/service/accord/AccordKeyspace.java | 19 +- .../service/accord/AccordObjectSizes.java | 29 +-- .../service/accord/AccordSafeCommandStore.java | 50 +++-- .../service/accord/AccordSafeCommandsForKey.java | 13 +- .../cassandra/service/accord/AccordSafeState.java | 2 + .../service/accord/AccordSafeTimestampsForKey.java | 9 +- .../cassandra/service/accord/AccordService.java | 8 +- .../service/accord/CommandsForRanges.java | 11 +- .../cassandra/service/accord/api/AccordAgent.java | 3 + .../service/accord/async/AsyncLoader.java | 12 +- .../service/accord/async/AsyncOperation.java | 5 +- .../service/accord/interop/AccordInteropApply.java | 2 - .../serializers/CommandsForKeySerializer.java | 217 +++++++++++---------- .../accord/serializers/WaitingOnSerializer.java | 103 +++++++--- .../cassandra/service/accord/txn/TxnWrite.java | 5 +- .../apache/cassandra/utils/vint/VIntCoding.java | 47 ++++- test/conf/logback-dtest-quiet.xml | 56 ++++++ .../cassandra/distributed/api/ICoordinator.java | 3 + .../cassandra/distributed/impl/Coordinator.java | 25 +++ .../cassandra/distributed/impl/Instance.java | 2 +- .../distributed/test/accord/AccordLoadTest.java | 96 +++++++++ .../compaction/CompactionAccordIteratorsTest.java | 2 +- .../service/accord/AccordCommandStoreTest.java | 29 +-- .../service/accord/AccordCommandTest.java | 8 +- .../cassandra/service/accord/AccordTestUtils.java | 12 +- .../service/accord/async/AsyncLoaderTest.java | 9 +- .../service/accord/async/AsyncOperationTest.java | 11 +- .../serializers/CommandsForKeySerializerTest.java | 39 ++-- .../serializers/WaitingOnSerializerTest.java | 33 ++-- 40 files changed, 654 insertions(+), 321 deletions(-) diff --git a/modules/accord b/modules/accord index ef36616441..3562bb3c9c 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit ef36616441bd4ff4fec5379d986c75ad5a62ff7d +Subproject commit 3562bb3c9ce4e9eecdf65e236e968ef3ee9e0a86 diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java b/src/java/org/apache/cassandra/config/AccordSpec.java index 697d7edc1e..e76745a233 100644 --- a/src/java/org/apache/cassandra/config/AccordSpec.java +++ b/src/java/org/apache/cassandra/config/AccordSpec.java @@ -47,9 +47,9 @@ public class AccordSpec public volatile DurationSpec fast_path_update_delay = new DurationSpec.IntSecondsBound(5); - public volatile DurationSpec schedule_durability_frequency = new DurationSpec.IntSecondsBound(15); + public volatile DurationSpec schedule_durability_frequency = new DurationSpec.IntSecondsBound(5); public volatile DurationSpec durability_txnid_lag = new DurationSpec.IntSecondsBound(5); - public volatile DurationSpec shard_durability_cycle = new DurationSpec.IntMinutesBound(2); + public volatile DurationSpec shard_durability_cycle = new DurationSpec.IntMinutesBound(1); public volatile DurationSpec global_durability_cycle = new DurationSpec.IntMinutesBound(10); public enum TransactionalRangeMigration diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 338ae13981..8a7a9d7d35 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -480,6 +480,7 @@ public class Config public volatile int counter_cache_keys_to_save = Integer.MAX_VALUE; public DataStorageSpec.LongMebibytesBound paxos_cache_size = null; + public DataStorageSpec.LongMebibytesBound accord_cache_size = null; public DataStorageSpec.LongMebibytesBound consensus_migration_cache_size = null; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 3922a94c93..7273f3c5c7 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -210,6 +210,7 @@ public class DatabaseDescriptor private static long keyCacheSizeInMiB; private static long paxosCacheSizeInMiB; + private static long accordCacheSizeInMiB; private static long consensusMigrationCacheSizeInMiB; private static long counterCacheSizeInMiB; private static long indexSummaryCapacityInMiB; @@ -894,6 +895,22 @@ public class DatabaseDescriptor + conf.paxos_cache_size + "', supported values are <integer> >= 0.", false); } + try + { + // if paxosCacheSizeInMiB option was set to "auto" then size of the cache should be "max(10% of Heap (in MB), 1MB) + accordCacheSizeInMiB = (conf.accord_cache_size == null) + ? Math.max(1, (int) ((Runtime.getRuntime().totalMemory() * 0.10) / 1024 / 1024)) + : conf.accord_cache_size.toMebibytes(); + + if (accordCacheSizeInMiB < 0) + throw new NumberFormatException(); // to escape duplicating error message + } + catch (NumberFormatException e) + { + throw new ConfigurationException("paxos_cache_size option was set incorrectly to '" + + conf.paxos_cache_size + "', supported values are <integer> >= 0.", false); + } + try { // if consensusMigrationCacheSizeInMiB option was set to "auto" then size of the cache should be "min(1% of Heap (in MB), 50MB) @@ -3863,6 +3880,11 @@ public class DatabaseDescriptor return paxosCacheSizeInMiB; } + public static long getAccordCacheSizeInMiB() + { + return accordCacheSizeInMiB; + } + public static long getConsensusMigrationCacheSizeInMiB() { return consensusMigrationCacheSizeInMiB; diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index bb404bbe5a..e0dcb5682f 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -978,11 +978,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte if (redundantBeforeEntry == null) return row; - TxnId redundantBeforeTxnId = redundantBeforeEntry.shardRedundantBefore(); - if (redundantBeforeTxnId.equals(TxnId.NONE)) - return row; - - return CommandsForKeysAccessor.withoutRedundantCommands(partitionKey, row, redundantBeforeTxnId); + return CommandsForKeysAccessor.withoutRedundantCommands(partitionKey, row, redundantBeforeEntry); } @Override diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index 457bd14c80..4fed5c618f 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -266,7 +266,7 @@ public enum Verb PAXOS2_PREPARE_REQ (40, P2, writeTimeout, MUTATION, () -> PaxosPrepare.requestSerializer, () -> PaxosPrepare.requestHandler, PAXOS2_PREPARE_RSP ), PAXOS2_PREPARE_REFRESH_RSP (51, P2, writeTimeout, REQUEST_RESPONSE, () -> PaxosPrepareRefresh.responseSerializer, RESPONSE_HANDLER ), PAXOS2_PREPARE_REFRESH_REQ (41, P2, writeTimeout, MUTATION, () -> PaxosPrepareRefresh.requestSerializer, () -> PaxosPrepareRefresh.requestHandler, PAXOS2_PREPARE_REFRESH_RSP ), - PAXOS2_PROPOSE_RSP (52, P2, writeTimeout, REQUEST_RESPONSE, () -> PaxosPropose.ACCEPT_RESULT_SERIALIZER, RESPONSE_HANDLER ), + PAXOS2_PROPOSE_RSP (52, P2, writeTimeout, REQUEST_RESPONSE, () -> PaxosPropose.ACCEPT_RESULT_SERIALIZER, RESPONSE_HANDLER ), PAXOS2_PROPOSE_REQ (42, P2, writeTimeout, MUTATION, () -> PaxosPropose.requestSerializer, () -> PaxosPropose.requestHandler, PAXOS2_PROPOSE_RSP ), PAXOS2_COMMIT_AND_PREPARE_RSP (53, P2, writeTimeout, REQUEST_RESPONSE, () -> PaxosPrepare.responseSerializer, RESPONSE_HANDLER ), PAXOS2_COMMIT_AND_PREPARE_REQ (43, P2, writeTimeout, MUTATION, () -> PaxosCommitAndPrepare.requestSerializer, () -> PaxosCommitAndPrepare.requestHandler, PAXOS2_COMMIT_AND_PREPARE_RSP ), @@ -305,41 +305,41 @@ public enum Verb DATA_MOVEMENT_EXECUTED_REQ (817, P1, rpcTimeout, MISC, () -> DataMovement.Status.serializer, () -> DataMovements.instance, DATA_MOVEMENT_EXECUTED_RSP ), // accord - ACCORD_SIMPLE_RSP (119, P2, writeTimeout, REQUEST_RESPONSE, () -> EnumSerializer.simpleReply, RESPONSE_HANDLER ), - ACCORD_PRE_ACCEPT_RSP (120, P2, writeTimeout, REQUEST_RESPONSE, () -> PreacceptSerializers.reply, RESPONSE_HANDLER ), + ACCORD_SIMPLE_RSP (119, P2, writeTimeout, IMMEDIATE, () -> EnumSerializer.simpleReply, RESPONSE_HANDLER ), + ACCORD_PRE_ACCEPT_RSP (120, P2, writeTimeout, IMMEDIATE, () -> PreacceptSerializers.reply, RESPONSE_HANDLER ), ACCORD_PRE_ACCEPT_REQ (121, P2, writeTimeout, IMMEDIATE, () -> PreacceptSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_PRE_ACCEPT_RSP ), - ACCORD_ACCEPT_RSP (122, P2, writeTimeout, REQUEST_RESPONSE, () -> AcceptSerializers.reply, RESPONSE_HANDLER ), + ACCORD_ACCEPT_RSP (122, P2, writeTimeout, IMMEDIATE, () -> AcceptSerializers.reply, RESPONSE_HANDLER ), ACCORD_ACCEPT_REQ (123, P2, writeTimeout, IMMEDIATE, () -> AcceptSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_ACCEPT_RSP ), ACCORD_ACCEPT_INVALIDATE_REQ (124, P2, writeTimeout, IMMEDIATE, () -> AcceptSerializers.invalidate, AccordService::verbHandlerOrNoop, ACCORD_ACCEPT_RSP ), - ACCORD_READ_RSP (125, P2, writeTimeout, REQUEST_RESPONSE, () -> ReadDataSerializers.reply, RESPONSE_HANDLER ), + ACCORD_READ_RSP (125, P2, writeTimeout, IMMEDIATE, () -> ReadDataSerializers.reply, RESPONSE_HANDLER ), ACCORD_READ_REQ (126, P2, writeTimeout, IMMEDIATE, () -> ReadDataSerializers.readData, AccordService::verbHandlerOrNoop, ACCORD_READ_RSP ), ACCORD_COMMIT_REQ (127, P2, writeTimeout, IMMEDIATE, () -> CommitSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_READ_RSP ), ACCORD_COMMIT_INVALIDATE_REQ (128, P2, writeTimeout, IMMEDIATE, () -> CommitSerializers.invalidate, AccordService::verbHandlerOrNoop ), - ACCORD_APPLY_RSP (129, P2, writeTimeout, REQUEST_RESPONSE, () -> ApplySerializers.reply, RESPONSE_HANDLER ), - ACCORD_APPLY_REQ (130, P2, writeTimeout, IMMEDIATE, () -> ApplySerializers.request, AccordService::verbHandlerOrNoop, ACCORD_APPLY_RSP ), - ACCORD_BEGIN_RECOVER_RSP (131, P2, writeTimeout, REQUEST_RESPONSE, () -> RecoverySerializers.reply, RESPONSE_HANDLER ), + ACCORD_APPLY_RSP (129, P2, writeTimeout, IMMEDIATE, () -> ApplySerializers.reply, RESPONSE_HANDLER ), + ACCORD_APPLY_REQ (130, P2, writeTimeout, IMMEDIATE, () -> ApplySerializers.request, AccordService::verbHandlerOrNoop, ACCORD_APPLY_RSP ), + ACCORD_BEGIN_RECOVER_RSP (131, P2, writeTimeout, IMMEDIATE, () -> RecoverySerializers.reply, RESPONSE_HANDLER ), ACCORD_BEGIN_RECOVER_REQ (132, P2, writeTimeout, IMMEDIATE, () -> RecoverySerializers.request, AccordService::verbHandlerOrNoop, ACCORD_BEGIN_RECOVER_RSP ), - ACCORD_BEGIN_INVALIDATE_RSP (133, P2, writeTimeout, REQUEST_RESPONSE, () -> BeginInvalidationSerializers.reply, RESPONSE_HANDLER ), + ACCORD_BEGIN_INVALIDATE_RSP (133, P2, writeTimeout, IMMEDIATE, () -> BeginInvalidationSerializers.reply, RESPONSE_HANDLER ), ACCORD_BEGIN_INVALIDATE_REQ (134, P2, writeTimeout, IMMEDIATE, () -> BeginInvalidationSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_BEGIN_INVALIDATE_RSP ), - ACCORD_WAIT_ON_COMMIT_RSP (136, P2, writeTimeout, REQUEST_RESPONSE, () -> WaitOnCommitSerializer.reply, RESPONSE_HANDLER ), + ACCORD_WAIT_ON_COMMIT_RSP (136, P2, writeTimeout, IMMEDIATE, () -> WaitOnCommitSerializer.reply, RESPONSE_HANDLER ), ACCORD_WAIT_ON_COMMIT_REQ (135, P2, writeTimeout, IMMEDIATE, () -> WaitOnCommitSerializer.request, AccordService::verbHandlerOrNoop, ACCORD_WAIT_ON_COMMIT_RSP ), ACCORD_WAIT_UNTIL_APPLIED_REQ (137, P2, writeTimeout, IMMEDIATE, () -> ReadDataSerializers.waitUntilApplied, AccordService::verbHandlerOrNoop, ACCORD_READ_RSP ), ACCORD_INFORM_OF_TXN_REQ (138, P2, writeTimeout, IMMEDIATE, () -> InformOfTxnIdSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_SIMPLE_RSP ), ACCORD_INFORM_HOME_DURABLE_REQ (139, P2, writeTimeout, IMMEDIATE, () -> InformHomeDurableSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_SIMPLE_RSP ), ACCORD_INFORM_DURABLE_REQ (140, P2, writeTimeout, IMMEDIATE, () -> InformDurableSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_SIMPLE_RSP ), - ACCORD_CHECK_STATUS_RSP (141, P2, writeTimeout, REQUEST_RESPONSE, () -> CheckStatusSerializers.reply, RESPONSE_HANDLER ), + ACCORD_CHECK_STATUS_RSP (141, P2, writeTimeout, IMMEDIATE, () -> CheckStatusSerializers.reply, RESPONSE_HANDLER ), ACCORD_CHECK_STATUS_REQ (142, P2, writeTimeout, IMMEDIATE, () -> CheckStatusSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_CHECK_STATUS_RSP ), - ACCORD_GET_DEPS_RSP (143, P2, writeTimeout, REQUEST_RESPONSE, () -> GetDepsSerializers.reply, RESPONSE_HANDLER ), + ACCORD_GET_DEPS_RSP (143, P2, writeTimeout, IMMEDIATE, () -> GetDepsSerializers.reply, RESPONSE_HANDLER ), ACCORD_GET_DEPS_REQ (144, P2, writeTimeout, IMMEDIATE, () -> GetDepsSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_GET_DEPS_RSP ), - ACCORD_GET_EPHMRL_READ_DEPS_RSP (161, P2, writeTimeout, REQUEST_RESPONSE, () -> GetEphmrlReadDepsSerializers.reply, RESPONSE_HANDLER ), + ACCORD_GET_EPHMRL_READ_DEPS_RSP (161, P2, writeTimeout, IMMEDIATE, () -> GetEphmrlReadDepsSerializers.reply, RESPONSE_HANDLER ), ACCORD_GET_EPHMRL_READ_DEPS_REQ (162, P2, writeTimeout, IMMEDIATE, () -> GetEphmrlReadDepsSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_GET_EPHMRL_READ_DEPS_RSP), - ACCORD_GET_MAX_CONFLICT_RSP (163, P2, writeTimeout, REQUEST_RESPONSE, () -> GetMaxConflictSerializers.reply, RESPONSE_HANDLER ), + ACCORD_GET_MAX_CONFLICT_RSP (163, P2, writeTimeout, IMMEDIATE, () -> GetMaxConflictSerializers.reply, RESPONSE_HANDLER ), ACCORD_GET_MAX_CONFLICT_REQ (164, P2, writeTimeout, IMMEDIATE, () -> GetMaxConflictSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_GET_MAX_CONFLICT_RSP), - ACCORD_FETCH_DATA_RSP (145, P2, repairTimeout,REQUEST_RESPONSE, () -> FetchSerializers.reply, RESPONSE_HANDLER ), + ACCORD_FETCH_DATA_RSP (145, P2, repairTimeout,IMMEDIATE, () -> FetchSerializers.reply, RESPONSE_HANDLER ), ACCORD_FETCH_DATA_REQ (146, P2, repairTimeout,IMMEDIATE, () -> FetchSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_FETCH_DATA_RSP ), ACCORD_SET_SHARD_DURABLE_REQ (147, P2, writeTimeout, IMMEDIATE, () -> SetDurableSerializers.shardDurable, AccordService::verbHandlerOrNoop, ACCORD_SIMPLE_RSP ), ACCORD_SET_GLOBALLY_DURABLE_REQ (148, P2, writeTimeout, IMMEDIATE, () -> SetDurableSerializers.globallyDurable,AccordService::verbHandlerOrNoop, ACCORD_SIMPLE_RSP ), - ACCORD_QUERY_DURABLE_BEFORE_RSP (149, P2, writeTimeout, REQUEST_RESPONSE, () -> QueryDurableBeforeSerializers.reply, RESPONSE_HANDLER ), + ACCORD_QUERY_DURABLE_BEFORE_RSP (149, P2, writeTimeout, IMMEDIATE, () -> QueryDurableBeforeSerializers.reply, RESPONSE_HANDLER ), ACCORD_QUERY_DURABLE_BEFORE_REQ (150, P2, writeTimeout, IMMEDIATE, () -> QueryDurableBeforeSerializers.request,AccordService::verbHandlerOrNoop, ACCORD_QUERY_DURABLE_BEFORE_RSP ), ACCORD_SYNC_NOTIFY_REQ (151, P2, writeTimeout, IMMEDIATE, () -> Notification.listSerializer, () -> AccordSyncPropagator.verbHandler, ACCORD_SIMPLE_RSP ), diff --git a/src/java/org/apache/cassandra/service/accord/AccordCachingState.java b/src/java/org/apache/cassandra/service/accord/AccordCachingState.java index 5175e86c10..d07dcfc87b 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCachingState.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCachingState.java @@ -123,7 +123,7 @@ public class AccordCachingState<K, V> extends IntrusiveLinkedListNode int estimatedSizeOnHeap(ToLongFunction<V> estimator) { - shouldUpdateSize = false; + shouldUpdateSize = false; // TODO (expected): probably not the safest place to clear need to compute size return lastQueriedEstimatedSizeOnHeap = Ints.checkedCast(EMPTY_SIZE + estimateStateOnHeapSize(estimator)); } @@ -204,7 +204,7 @@ public class AccordCachingState<K, V> extends IntrusiveLinkedListNode protected State<K, V> state(State<K, V> next) { State<K, V> prev = state; - if (prev != next) + if (prev != next) // TODO (expected): we change state to transition the cache state machine but often keep payload the same - so shouldn't recompute shouldUpdateSize = true; return state = next; } diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index 36b224b022..4b0722606b 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -42,7 +42,7 @@ import accord.api.Agent; import accord.api.DataStore; import accord.api.Key; import accord.api.ProgressLog; -import accord.impl.CommandsForKey; +import accord.local.CommandsForKey; import accord.impl.CommandsSummary; import accord.impl.TimestampsForKey; import accord.local.Command; @@ -114,8 +114,8 @@ public class AccordCommandStore extends CommandStore implements CacheSize private final ExecutionOrder executionOrder; private final AccordStateCache stateCache; private final AccordStateCache.Instance<TxnId, Command, AccordSafeCommand> commandCache; - private final AccordStateCache.Instance<RoutableKey, TimestampsForKey, AccordSafeTimestampsForKey> timestampsForKeyCache; - private final AccordStateCache.Instance<RoutableKey, CommandsForKey, AccordSafeCommandsForKey> commandsForKeyCache; + private final AccordStateCache.Instance<Key, TimestampsForKey, AccordSafeTimestampsForKey> timestampsForKeyCache; + private final AccordStateCache.Instance<Key, CommandsForKey, AccordSafeCommandsForKey> commandsForKeyCache; private AsyncOperation<?> currentOperation = null; private AccordSafeCommandStore current = null; private long lastSystemTimestampMicros = Long.MIN_VALUE; @@ -161,7 +161,7 @@ public class AccordCommandStore extends CommandStore implements CacheSize this::validateCommand, AccordObjectSizes::command); timestampsForKeyCache = - stateCache.instance(RoutableKey.class, + stateCache.instance(Key.class, AccordSafeTimestampsForKey.class, AccordSafeTimestampsForKey::new, this::loadTimestampsForKey, @@ -169,7 +169,7 @@ public class AccordCommandStore extends CommandStore implements CacheSize this::validateTimestampsForKey, AccordObjectSizes::timestampsForKey); commandsForKeyCache = - stateCache.instance(RoutableKey.class, + stateCache.instance(Key.class, AccordSafeCommandsForKey.class, AccordSafeCommandsForKey::new, this::loadCommandsForKey, @@ -220,7 +220,7 @@ public class AccordCommandStore extends CommandStore implements CacheSize MessageProvider messageProvider = journal.makeMessageProvider(txnId); - SerializerSupport.TxnAndDeps txnAndDeps = SerializerSupport.extractTxnAndDeps(status, accepted, messageProvider); + SerializerSupport.TxnAndDeps txnAndDeps = SerializerSupport.extractTxnAndDeps(unsafeRangesForEpoch(), status, accepted, messageProvider); Seekables<?, ?> keys = txnAndDeps.txn.keys(); if (keys.domain() != Routable.Domain.Range) throw new AssertionError(String.format("Txn keys are not range for %s", txnAndDeps.txn)); @@ -311,12 +311,12 @@ public class AccordCommandStore extends CommandStore implements CacheSize return commandCache; } - public AccordStateCache.Instance<RoutableKey, TimestampsForKey, AccordSafeTimestampsForKey> timestampsForKeyCache() + public AccordStateCache.Instance<Key, TimestampsForKey, AccordSafeTimestampsForKey> timestampsForKeyCache() { return timestampsForKeyCache; } - public AccordStateCache.Instance<RoutableKey, CommandsForKey, AccordSafeCommandsForKey> commandsForKeyCache() + public AccordStateCache.Instance<Key, CommandsForKey, AccordSafeCommandsForKey> commandsForKeyCache() { return commandsForKeyCache; } @@ -466,8 +466,8 @@ public class AccordCommandStore extends CommandStore implements CacheSize public AccordSafeCommandStore beginOperation(PreLoadContext preLoadContext, Map<TxnId, AccordSafeCommand> commands, - NavigableMap<RoutableKey, AccordSafeTimestampsForKey> timestampsForKeys, - NavigableMap<RoutableKey, AccordSafeCommandsForKey> commandsForKeys) + NavigableMap<Key, AccordSafeTimestampsForKey> timestampsForKeys, + NavigableMap<Key, AccordSafeCommandsForKey> commandsForKeys) { Invariants.checkState(current == null); commands.values().forEach(AccordSafeState::preExecute); diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java index cf4cda992b..0fd719d5fb 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java @@ -31,6 +31,7 @@ import accord.primitives.Range; import accord.topology.Topology; import accord.utils.RandomSource; import org.apache.cassandra.cache.CacheSize; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.metrics.AccordStateCacheMetrics; import org.apache.cassandra.metrics.CacheSizeMetrics; import org.apache.cassandra.schema.TableId; @@ -47,7 +48,7 @@ public class AccordCommandStores extends CommandStores implements CacheSize ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, AccordJournal journal) { super(time, agent, store, random, shardDistributor, progressLogFactory, AccordCommandStore.factory(journal, new AccordStateCacheMetrics(ACCORD_STATE_CACHE))); - setCapacity(maxCacheSize()); + setCapacity(DatabaseDescriptor.getAccordCacheSizeInMiB() << 20); this.cacheSizeMetrics = new CacheSizeMetrics(ACCORD_STATE_CACHE, this); } @@ -110,11 +111,6 @@ public class AccordCommandStores extends CommandStores implements CacheSize forEach(commandStore -> ((AccordSafeCommandStore) commandStore).commandStore().setCapacity(perStore)); } - private static long maxCacheSize() - { - return 5 << 20; // TODO (required): make configurable - } - @Override public synchronized Supplier<EpochReady> updateTopology(Node node, Topology newTopology, boolean startSync) { diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfiguration.java b/src/java/org/apache/cassandra/service/accord/AccordConfiguration.java index a17a9fc844..d87e6c96cd 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordConfiguration.java +++ b/src/java/org/apache/cassandra/service/accord/AccordConfiguration.java @@ -23,6 +23,7 @@ import java.time.Duration; import accord.config.LocalConfig; import org.apache.cassandra.config.Config; +// TODO (expected): should this be merged with AccordSpec? public class AccordConfiguration implements LocalConfig { private final Config config; diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index ae202d2f63..26e09ada14 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -570,6 +571,7 @@ public class AccordJournal implements Shutdownable Key(Timestamp timestamp, Type type) { + if (timestamp == null) throw new NullPointerException("Null timestamp for type " + type); this.timestamp = timestamp; this.type = type; } @@ -1361,6 +1363,19 @@ public class AccordJournal implements Shutdownable return presentMessages; } + public Set<MessageType> all() + { + Set<Type> types = EnumSet.allOf(Type.class); + Set<Key> keys = new ObjectHashSet<>(types.size() + 1, 0.9f); + for (Type type : types) + keys.add(new Key(txnId, type)); + Set<Key> presentKeys = journal.test(keys); + Set<MessageType> presentMessages = new ObjectHashSet<>(presentKeys.size() + 1, 0.9f); + for (Key key : presentKeys) + presentMessages.add(key.type.outgoingType); + return presentMessages; + } + @Override public PreAccept preAccept() { diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index 2684be683f..535bc4948f 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -45,7 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.api.Key; -import accord.impl.CommandsForKey; +import accord.local.CommandsForKey; import accord.impl.TimestampsForKey; import accord.local.Command; import accord.local.Command.WaitingOn; @@ -592,7 +592,7 @@ public class AccordKeyspace } // TODO (expected): garbage-free filtering, reusing encoding - public Row withoutRedundantCommands(PartitionKey key, Row row, TxnId redundantBefore) + public Row withoutRedundantCommands(PartitionKey key, Row row, RedundantBefore.Entry redundantBefore) { Invariants.checkState(row.columnCount() == 1); Cell<?> cell = row.getCell(data); @@ -600,7 +600,10 @@ public class AccordKeyspace return row; CommandsForKey current = CommandsForKeySerializer.fromBytes(key, cell.buffer()); - CommandsForKey updated = current.withoutRedundant(redundantBefore); + if (current == null) + return null; + + CommandsForKey updated = current.withRedundantBefore(redundantBefore); if (current == updated) return row; @@ -822,7 +825,7 @@ public class AccordKeyspace Command.Committed committed = command.asCommitted(); Command.Committed originalCommitted = original != null && original.isCommitted() ? original.asCommitted() : null; if (originalCommitted == null || committed.waitingOn != originalCommitted.waitingOn) - builder.addCell(live(CommandsColumns.waiting_on, timestampMicros, WaitingOnSerializer.serialize(committed.waitingOn))); + builder.addCell(live(CommandsColumns.waiting_on, timestampMicros, WaitingOnSerializer.serialize(committed.txnId(), committed.waitingOn))); } Row row = builder.build(); @@ -1189,10 +1192,10 @@ public class AccordKeyspace Ballot promised = deserializePromisedOrNull(row); Ballot accepted = deserializeAcceptedOrNull(row); - WaitingOnProvider waitingOn = deserializeWaitingOn(row); + WaitingOnProvider waitingOn = deserializeWaitingOn(txnId, row); MessageProvider messages = commandStore.makeMessageProvider(txnId); - return SerializerSupport.reconstruct(attrs, status, executeAt, promised, accepted, waitingOn, messages); + return SerializerSupport.reconstruct(commandStore.unsafeRangesForEpoch(), attrs, status, executeAt, promised, accepted, waitingOn, messages); } catch (Throwable t) { @@ -1270,7 +1273,7 @@ public class AccordKeyspace return deserializeTimestampOrNull(row.getBlob("accepted_ballot"), Ballot::fromBits); } - private static WaitingOnProvider deserializeWaitingOn(UntypedResultSet.Row row) + private static WaitingOnProvider deserializeWaitingOn(TxnId txnId, UntypedResultSet.Row row) { ByteBuffer bytes = row.getBlob("waiting_on"); @@ -1284,7 +1287,7 @@ public class AccordKeyspace try { - return WaitingOnSerializer.deserialize(deps, bytes); + return WaitingOnSerializer.deserialize(txnId, deps.keyDeps.keys(), deps.rangeDeps.txnIds(), bytes); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java index a1704ba52b..7346a6eebf 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java +++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java @@ -24,11 +24,12 @@ import java.util.function.ToLongFunction; import accord.api.Key; import accord.api.Result; import accord.api.RoutingKey; -import accord.impl.CommandsForKey; -import accord.impl.CommandsForKey.Info; +import accord.local.CommandsForKey; +import accord.local.CommandsForKey.TxnInfo; import accord.impl.TimestampsForKey; import accord.local.Command; import accord.local.Command.WaitingOn; +import accord.local.CommandsForKey.TxnInfoWithMissing; import accord.local.CommonAttributes; import accord.local.Node; import accord.local.SaveStatus; @@ -305,7 +306,6 @@ public class AccordObjectSizes return ACCEPTED; case Committed: case Stable: - case ReadyToExecute: return COMMITTED; case PreApplied: case Applied: @@ -363,22 +363,25 @@ public class AccordObjectSizes } private static long EMPTY_CFK_SIZE = measure(new CommandsForKey(null)); - private static long EMPTY_INFO_SIZE = measure(CommandsForKey.Info.createMock(null, null, null)); + private static long EMPTY_INFO_SIZE = measure(TxnInfo.createMock(TxnId.NONE, null, null, null)); + private static long EMPTY_INFO_WITH_MISSING_ADDITIONAL_SIZE = measure(TxnInfo.createMock(TxnId.NONE, null, null, null)) - EMPTY_INFO_SIZE; public static long commandsForKey(CommandsForKey cfk) { long size = EMPTY_CFK_SIZE; size += key(cfk.key()); - size += 2 * ObjectSizes.sizeOfReferenceArray(cfk.size()); - size += cfk.size() * TIMESTAMP_SIZE; + size += ObjectSizes.sizeOfReferenceArray(cfk.size()); + size += cfk.size() * EMPTY_INFO_SIZE; for (int i = 0 ; i < cfk.size() ; ++i) { - Info info = cfk.info(i); - if (info.getClass() == CommandsForKey.NoInfo.class) - continue; - - size += EMPTY_INFO_SIZE; - if (info.missing.length > 0) - size += ObjectSizes.sizeOfReferenceArray(info.missing.length); + TxnInfo info = cfk.get(i); + if (info.getClass() != TxnInfoWithMissing.class) continue; + TxnInfoWithMissing infoWithMissing = (TxnInfoWithMissing) info; + if (infoWithMissing.missing.length > 0) + { + size += EMPTY_INFO_WITH_MISSING_ADDITIONAL_SIZE; + size += ObjectSizes.sizeOfReferenceArray(infoWithMissing.missing.length); + size += infoWithMissing.missing.length * TIMESTAMP_SIZE; + } } return size; } diff --git a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java index c1b09ccb27..1c497cc5c9 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java @@ -31,8 +31,7 @@ import accord.api.DataStore; import accord.api.Key; import accord.api.ProgressLog; import accord.impl.AbstractSafeCommandStore; -import accord.impl.CommandsForKey; -import accord.impl.CommandsForKeys; +import accord.local.CommandsForKey; import accord.impl.CommandsSummary; import accord.local.Command; import accord.local.CommandStores.RangesForEpoch; @@ -41,7 +40,6 @@ import accord.local.PreLoadContext; import accord.primitives.AbstractKeys; import accord.primitives.Deps; import accord.primitives.Ranges; -import accord.primitives.RoutableKey; import accord.primitives.Routables; import accord.primitives.Seekables; import accord.primitives.Timestamp; @@ -53,16 +51,16 @@ import static accord.primitives.Routable.Domain.Range; public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeCommand, AccordSafeTimestampsForKey, AccordSafeCommandsForKey> { private final Map<TxnId, AccordSafeCommand> commands; - private final NavigableMap<RoutableKey, AccordSafeCommandsForKey> commandsForKeys; - private final NavigableMap<RoutableKey, AccordSafeTimestampsForKey> timestampsForKeys; + private final NavigableMap<Key, AccordSafeCommandsForKey> commandsForKeys; + private final NavigableMap<Key, AccordSafeTimestampsForKey> timestampsForKeys; private final AccordCommandStore commandStore; private final RangesForEpoch ranges; CommandsForRanges.Updater rangeUpdates = null; public AccordSafeCommandStore(PreLoadContext context, Map<TxnId, AccordSafeCommand> commands, - NavigableMap<RoutableKey, AccordSafeTimestampsForKey> timestampsForKey, - NavigableMap<RoutableKey, AccordSafeCommandsForKey> commandsForKey, + NavigableMap<Key, AccordSafeTimestampsForKey> timestampsForKey, + NavigableMap<Key, AccordSafeCommandsForKey> commandsForKey, AccordCommandStore commandStore) { super(context); @@ -94,7 +92,7 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC } @Override - protected AccordSafeCommandsForKey getCommandsForKeyInternal(RoutableKey key) + protected AccordSafeCommandsForKey getCommandsForKeyInternal(Key key) { return commandsForKeys.get(key); } @@ -106,7 +104,7 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC } @Override - protected AccordSafeCommandsForKey getCommandsForKeyIfLoaded(RoutableKey key) + protected AccordSafeCommandsForKey getCommandsForKeyIfLoaded(Key key) { AccordSafeCommandsForKey cfk = commandStore.commandsForKeyCache().acquireIfLoaded(key); if (cfk != null) cfk.preExecute(); @@ -114,7 +112,7 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC } @Override - protected AccordSafeTimestampsForKey getTimestampsForKeyInternal(RoutableKey key) + protected AccordSafeTimestampsForKey getTimestampsForKeyInternal(Key key) { return timestampsForKeys.get(key); } @@ -126,7 +124,7 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC } @Override - protected AccordSafeTimestampsForKey getTimestampsForKeyIfLoaded(RoutableKey key) + protected AccordSafeTimestampsForKey getTimestampsForKeyIfLoaded(Key key) { AccordSafeTimestampsForKey cfk = commandStore.timestampsForKeyCache().acquireIfLoaded(key); if (cfk != null) cfk.preExecute(); @@ -177,14 +175,14 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC Ranges allRanges = ranges.all(); deps.keyDeps.keys().forEach(allRanges, key -> { // TODO (now): batch register to minimise GC - deps.keyDeps.forEach(key, txnId -> { + deps.keyDeps.forEach(key, (txnId, txnIdx) -> { // TODO (desired, efficiency): this can be made more efficient by batching by epoch if (ranges.coordinates(txnId).contains(key)) return; // already coordinates, no need to replicate if (!ranges.allBefore(txnId.epoch()).contains(key)) return; - CommandsForKeys.registerNotWitnessed(this, key, txnId); + get(key).registerHistorical(this, txnId); }); }); CommandsForRanges commandsForRanges = commandStore.commandsForRanges(); @@ -221,7 +219,7 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC for (Key key : keys) { if (!slice.contains(key)) continue; - CommandsForKey commands = commandsForKey(key).current(); + CommandsForKey commands = get(key).current(); accumulate = map.apply(commands, accumulate); } } @@ -233,11 +231,11 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC Routables<?> sliced = keysOrRanges.slice(slice, Routables.Slice.Minimal); if (!context.keys().slice(slice, Routables.Slice.Minimal).containsAll(sliced)) throw new AssertionError("Range(s) detected not present in the PreLoadContext: expected " + context.keys() + " but given " + keysOrRanges); - for (RoutableKey key : timestampsForKeys.keySet()) + for (Key key : commandsForKeys.keySet()) { //TODO (duplicate code): this is a repeat of Key... only change is checking contains in range if (!sliced.contains(key)) continue; - CommandsForKey commands = commandsForKey(key).current(); + CommandsForKey commands = get(key).current(); accumulate = map.apply(commands, accumulate); } } @@ -263,22 +261,20 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC } @Override - protected void update(Command prev, Command updated, @Nullable Seekables<?, ?> keysOrRanges) + protected void update(Command prev, Command updated) { - super.update(prev, updated, keysOrRanges); + super.update(prev, updated); if (updated.txnId().domain() == Range && CommandsForKey.needsUpdate(prev, updated)) { + Seekables<?, ?> keysOrRanges = updated.keysOrRanges(); + if (keysOrRanges == null) keysOrRanges = prev.keysOrRanges(); if (keysOrRanges == null) - { - if (updated.known().isDefinitionKnown()) keysOrRanges = updated.partialTxn().keys(); - else if (prev.known().isDefinitionKnown()) keysOrRanges = prev.partialTxn().keys(); - else return; - } - List<TxnId> waitingOn; + return; - if (updated.partialDeps() == null) waitingOn = Collections.emptyList(); + List<TxnId> waitingOn; // TODO (required): this is faulty: we cannot simply save the raw transaction ids, as they may be for other ranges + if (updated.partialDeps() == null) waitingOn = Collections.emptyList(); else waitingOn = updated.partialDeps().txnIds(); updateRanges().put(updated.txnId(), (Ranges)keysOrRanges, updated.saveStatus(), updated.executeAt(), waitingOn); } @@ -300,8 +296,8 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC } public void postExecute(Map<TxnId, AccordSafeCommand> commands, - Map<RoutableKey, AccordSafeTimestampsForKey> timestampsForKey, - Map<RoutableKey, AccordSafeCommandsForKey> commandsForKeys + Map<Key, AccordSafeTimestampsForKey> timestampsForKey, + Map<Key, AccordSafeCommandsForKey> commandsForKeys ) { postExecute(); diff --git a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForKey.java b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForKey.java index 748143f333..808b4d4bc1 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForKey.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForKey.java @@ -23,18 +23,17 @@ import java.util.Objects; import com.google.common.annotations.VisibleForTesting; import accord.api.Key; -import accord.impl.CommandsForKey; -import accord.impl.SafeCommandsForKey; -import accord.primitives.RoutableKey; +import accord.local.CommandsForKey; +import accord.local.SafeCommandsForKey; -public class AccordSafeCommandsForKey extends SafeCommandsForKey implements AccordSafeState<RoutableKey, CommandsForKey> +public class AccordSafeCommandsForKey extends SafeCommandsForKey implements AccordSafeState<Key, CommandsForKey> { private boolean invalidated; - private final AccordCachingState<RoutableKey, CommandsForKey> global; + private final AccordCachingState<Key, CommandsForKey> global; private CommandsForKey original; private CommandsForKey current; - public AccordSafeCommandsForKey(AccordCachingState<RoutableKey, CommandsForKey> global) + public AccordSafeCommandsForKey(AccordCachingState<Key, CommandsForKey> global) { super((Key) global.key()); this.global = global; @@ -83,7 +82,7 @@ public class AccordSafeCommandsForKey extends SafeCommandsForKey implements Acco } @Override - public AccordCachingState<RoutableKey, CommandsForKey> global() + public AccordCachingState<Key, CommandsForKey> global() { checkNotInvalidated(); return global; diff --git a/src/java/org/apache/cassandra/service/accord/AccordSafeState.java b/src/java/org/apache/cassandra/service/accord/AccordSafeState.java index b742efb9d1..374968bcfb 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSafeState.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSafeState.java @@ -25,6 +25,8 @@ public interface AccordSafeState<K, V> extends SafeState<V> { void set(V update); V original(); + void invalidate(); + boolean invalidated(); void preExecute(); void postExecute(); AccordCachingState<K, V> global(); diff --git a/src/java/org/apache/cassandra/service/accord/AccordSafeTimestampsForKey.java b/src/java/org/apache/cassandra/service/accord/AccordSafeTimestampsForKey.java index b5b44a7703..a4c48c83e6 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSafeTimestampsForKey.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSafeTimestampsForKey.java @@ -26,17 +26,16 @@ import com.google.common.annotations.VisibleForTesting; import accord.api.Key; import accord.impl.SafeTimestampsForKey; import accord.impl.TimestampsForKey; -import accord.primitives.RoutableKey; import accord.primitives.Timestamp; -public class AccordSafeTimestampsForKey extends SafeTimestampsForKey implements AccordSafeState<RoutableKey, TimestampsForKey> +public class AccordSafeTimestampsForKey extends SafeTimestampsForKey implements AccordSafeState<Key, TimestampsForKey> { private boolean invalidated; - private final AccordCachingState<RoutableKey, TimestampsForKey> global; + private final AccordCachingState<Key, TimestampsForKey> global; private TimestampsForKey original; private TimestampsForKey current; - public AccordSafeTimestampsForKey(AccordCachingState<RoutableKey, TimestampsForKey> global) + public AccordSafeTimestampsForKey(AccordCachingState<Key, TimestampsForKey> global) { super((Key) global.key()); this.global = global; @@ -71,7 +70,7 @@ public class AccordSafeTimestampsForKey extends SafeTimestampsForKey implements } @Override - public AccordCachingState<RoutableKey, TimestampsForKey> global() + public AccordCachingState<Key, TimestampsForKey> global() { checkNotInvalidated(); return global; diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 2f790e80b7..290541adec 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -123,7 +123,7 @@ public class AccordService implements IAccordService, Shutdownable private final AccordDataStore dataStore; private final AccordJournal journal; private final CoordinateDurabilityScheduling durabilityScheduling; - private final AccordVerbHandler<? extends Request> verbHandler; + private final AccordVerbHandler<? extends Request> requestHandler; private final LocalConfig configuration; @GuardedBy("this") private State state = State.INIT; @@ -292,7 +292,7 @@ public class AccordService implements IAccordService, Shutdownable configuration); this.nodeShutdown = toShutdownable(node); this.durabilityScheduling = new CoordinateDurabilityScheduling(node); - this.verbHandler = new AccordVerbHandler<>(node, configService, journal); + this.requestHandler = new AccordVerbHandler<>(node, configService, journal); } @Override @@ -309,14 +309,14 @@ public class AccordService implements IAccordService, Shutdownable durabilityScheduling.setShardCycleTime(Ints.checkedCast(DatabaseDescriptor.getAccordShardDurabilityCycle(SECONDS)), SECONDS); durabilityScheduling.setTxnIdLag(Ints.checkedCast(DatabaseDescriptor.getAccordScheduleDurabilityTxnIdLag(SECONDS)), TimeUnit.SECONDS); durabilityScheduling.setFrequency(Ints.checkedCast(DatabaseDescriptor.getAccordScheduleDurabilityFrequency(SECONDS)), SECONDS); - durabilityScheduling.start(); +// durabilityScheduling.start(); state = State.STARTED; } @Override public IVerbHandler<? extends Request> verbHandler() { - return verbHandler; + return requestHandler; } @Override diff --git a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java index acf342bf25..426ab4061b 100644 --- a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java +++ b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java @@ -41,7 +41,7 @@ import com.google.common.collect.ImmutableSortedMap; import accord.api.Key; import accord.api.RoutingKey; -import accord.impl.CommandsForKey; +import accord.local.CommandsForKey; import accord.impl.CommandsSummary; import accord.local.Command; import accord.local.SaveStatus; @@ -64,7 +64,6 @@ import org.apache.cassandra.utils.IntervalTree; import static accord.local.SafeCommandStore.*; import static accord.local.SafeCommandStore.TestDep.ANY_DEPS; import static accord.local.SafeCommandStore.TestDep.WITH; -import static accord.local.SafeCommandStore.TestStartedAt.ANY; import static accord.local.SafeCommandStore.TestStartedAt.STARTED_BEFORE; import static accord.local.SafeCommandStore.TestStatus.ANY_STATUS; import static accord.local.Status.Stable; @@ -372,11 +371,13 @@ public class CommandsForRanges private static Range toRange(Interval<RoutableKey, RangeCommandSummary> interval) { - TokenKey start = (TokenKey) interval.min; - TokenKey end = (TokenKey) interval.max; + AccordRoutingKey start = (AccordRoutingKey) interval.min; + if (!(start instanceof AccordRoutingKey.SentinelKey)) + start = new TokenKey(start.table(), start.token().decreaseSlightly()); + AccordRoutingKey end = (AccordRoutingKey) interval.max; // TODO (required, correctness) : accord doesn't support wrap around, so decreaseSlightly may fail in some cases // TODO (required, correctness) : this logic is mostly used for testing, so is it actually safe for all partitioners? - return new TokenRange(start.withToken(start.token().decreaseSlightly()), end); + return new TokenRange(start, end); } @Nullable diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java index f9ab58387d..33f8f2b088 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java @@ -117,6 +117,9 @@ public class AccordAgent implements Agent public boolean isExpired(TxnId initiated, long now) { // TODO: should distinguish between reads and writes + if (initiated.kind().isSyncPoint()) + return false; + return now - initiated.hlc() > getReadRpcTimeout(MICROSECONDS); } diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java index b8494e7d44..4b7607a36b 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java @@ -33,12 +33,12 @@ import com.google.common.collect.ImmutableSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.Key; import accord.api.RoutingKey; import accord.local.KeyHistory; import accord.local.PreLoadContext; import accord.primitives.Range; import accord.primitives.Ranges; -import accord.primitives.RoutableKey; import accord.primitives.Seekables; import accord.primitives.TxnId; import accord.utils.Invariants; @@ -121,7 +121,7 @@ public class AsyncLoader } } - private void referenceAndAssembleReadsForKey(RoutableKey key, + private void referenceAndAssembleReadsForKey(Key key, AsyncOperation.Context context, List<AsyncChain<?>> listenChains) { @@ -157,7 +157,7 @@ public class AsyncLoader { case Key: // cast to Keys fails... - Iterable<RoutableKey> keys = (Iterable<RoutableKey>) keysOrRanges; + Iterable<Key> keys = (Iterable<Key>) keysOrRanges; keys.forEach(key -> referenceAndAssembleReadsForKey(key, context, chains)); break; case Range: @@ -172,7 +172,7 @@ public class AsyncLoader private AsyncChain<?> referenceAndDispatchReadsForRange(AsyncOperation.Context context) { - AsyncChain<Set<? extends RoutableKey>> overlappingKeys = findOverlappingKeys((Ranges) keysOrRanges); + AsyncChain<Set<? extends Key>> overlappingKeys = findOverlappingKeys((Ranges) keysOrRanges); return overlappingKeys.flatMap(keys -> { if (keys.isEmpty()) @@ -183,14 +183,14 @@ public class AsyncLoader }, commandStore); } - private AsyncChain<Set<? extends RoutableKey>> findOverlappingKeys(Ranges ranges) + private AsyncChain<Set<? extends Key>> findOverlappingKeys(Ranges ranges) { Invariants.checkArgument(!ranges.isEmpty()); List<AsyncChain<Set<PartitionKey>>> chains = new ArrayList<>(ranges.size()); for (Range range : ranges) chains.add(findOverlappingKeys(range)); - return AsyncChains.reduce(chains, (a, b) -> ImmutableSet.<RoutableKey>builder().addAll(a).addAll(b).build()); + return AsyncChains.reduce(chains, (a, b) -> ImmutableSet.<Key>builder().addAll(a).addAll(b).build()); } private AsyncChain<Set<PartitionKey>> findOverlappingKeys(Range range) diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java index bef57bcf0b..f0c53e33d7 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import accord.api.Key; import accord.local.CommandStore; import accord.local.PreLoadContext; import accord.local.SafeCommandStore; @@ -67,8 +68,8 @@ public abstract class AsyncOperation<R> extends AsyncChains.Head<R> implements R static class Context { final HashMap<TxnId, AccordSafeCommand> commands = new HashMap<>(); - final TreeMap<RoutableKey, AccordSafeTimestampsForKey> timestampsForKey = new TreeMap<>(); - final TreeMap<RoutableKey, AccordSafeCommandsForKey> commandsForKey = new TreeMap<>(); + final TreeMap<Key, AccordSafeTimestampsForKey> timestampsForKey = new TreeMap<>(); + final TreeMap<Key, AccordSafeCommandsForKey> commandsForKey = new TreeMap<>(); void releaseResources(AccordCommandStore commandStore) { diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java index 7294dd2696..49c06e811c 100644 --- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java +++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java @@ -132,7 +132,6 @@ public class AccordInteropApply extends Apply implements Command.TransientListen case PreCommitted: case Committed: case PreApplied: - case ReadyToExecute: synchronized (this) { waitingOn.set(safeStore.commandStore().id()); @@ -249,7 +248,6 @@ public class AccordInteropApply extends Apply implements Command.TransientListen case PreCommitted: case Committed: case PreApplied: - case ReadyToExecute: return; case Applied: diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java index d5144cbe8d..dbe2f4845f 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java @@ -24,10 +24,10 @@ import java.util.Arrays; import com.google.common.primitives.Ints; import accord.api.Key; -import accord.impl.CommandsForKey; -import accord.impl.CommandsForKey.Info; -import accord.impl.CommandsForKey.InternalStatus; -import accord.impl.CommandsForKey.NoInfo; +import accord.local.CommandsForKey; +import accord.local.CommandsForKey.TxnInfo; +import accord.local.CommandsForKey.InternalStatus; +import accord.local.CommandsForKey.Unmanaged; import accord.local.Node; import accord.primitives.Routable.Domain; import accord.primitives.Timestamp; @@ -35,8 +35,10 @@ import accord.primitives.Txn; import accord.primitives.TxnId; import accord.utils.Invariants; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.marshal.ByteBufferAccessor; import org.apache.cassandra.utils.vint.VIntCoding; +import static accord.local.CommandsForKey.NO_PENDING_UNMANAGED; import static accord.primitives.Txn.Kind.ExclusiveSyncPoint; import static accord.primitives.Txn.Kind.Read; import static accord.primitives.Txn.Kind.Write; @@ -104,23 +106,7 @@ public class CommandsForKeySerializer { int commandCount = cfk.size(); if (commandCount == 0) - { - // TODO (expected): we should not need to special-case, but best solution here is not to store redundantBefore; - // but this requires some modest deeper changes, so for now special-case serialization when empty - Timestamp redundantBefore = cfk.redundantBefore(); - ByteBuffer out = ByteBuffer.allocate(TypeSizes.sizeofUnsignedVInt(0) + - TypeSizes.sizeofUnsignedVInt(redundantBefore.epoch()) + - TypeSizes.sizeofUnsignedVInt(redundantBefore.hlc()) + - TypeSizes.sizeofUnsignedVInt(redundantBefore.flags()) + - TypeSizes.sizeofUnsignedVInt(redundantBefore.node.id)); - VIntCoding.writeUnsignedVInt32(0, out); - VIntCoding.writeUnsignedVInt(redundantBefore.epoch(), out); - VIntCoding.writeUnsignedVInt(redundantBefore.hlc(), out); - VIntCoding.writeUnsignedVInt32(redundantBefore.flags(), out); - VIntCoding.writeUnsignedVInt32(redundantBefore.node.id, out); - out.flip(); - return out; - } + return ByteBuffer.allocate(1); int[] nodeIds = cachedInts().getInts(Math.min(64, commandCount)); try @@ -129,11 +115,9 @@ public class CommandsForKeySerializer // whether we have any missing transactions to encode, any executeAt that are not equal to their TxnId // and whether there are any non-standard flag bits to encode boolean hasNonStandardFlags = false; - int nodeIdCount, missingIdCount = 0, executeAtCount = 0, bitsPerExecuteAtFlags = 0; + int nodeIdCount = 0, missingIdCount = 0, executeAtCount = 0, bitsPerExecuteAtFlags = 0; int bitsPerExecuteAtEpochDelta = 0, bitsPerExecuteAtHlcDelta = 1; // to permit us to use full 64 bits and encode in 5 bits we force at least one hlc bit { - nodeIdCount = 1; - nodeIds[0] = cfk.redundantBefore().node.id; for (int i = 0 ; i < commandCount ; ++i) { if (nodeIdCount + 1 >= nodeIds.length) @@ -143,24 +127,19 @@ public class CommandsForKeySerializer nodeIds = cachedInts().resize(nodeIds, nodeIds.length, nodeIds.length * 2); } - TxnId txnId = cfk.txnId(i); - Info info = cfk.info(i); + TxnInfo txn = cfk.get(i); - hasNonStandardFlags |= txnIdFlags(txnId) != STANDARD; - nodeIds[nodeIdCount++] = txnId.node.id; + hasNonStandardFlags |= txnIdFlags(txn) != STANDARD; + nodeIds[nodeIdCount++] = txn.node.id; - if (info.getClass() == NoInfo.class) + missingIdCount += txn.missing().length; + if (txn.executeAt == txn) continue; - missingIdCount += info.missing.length; - - if (info.executeAt == txnId) - continue; - - nodeIds[nodeIdCount++] = info.executeAt.node.id; - bitsPerExecuteAtEpochDelta = Math.max(bitsPerExecuteAtEpochDelta, numberOfBitsToRepresent(info.executeAt.epoch() - txnId.epoch())); - bitsPerExecuteAtHlcDelta = Math.max(bitsPerExecuteAtHlcDelta, numberOfBitsToRepresent(info.executeAt.hlc() - txnId.hlc())); - bitsPerExecuteAtFlags = Math.max(bitsPerExecuteAtFlags, numberOfBitsToRepresent(info.executeAt.flags())); + nodeIds[nodeIdCount++] = txn.executeAt.node.id; + bitsPerExecuteAtEpochDelta = Math.max(bitsPerExecuteAtEpochDelta, numberOfBitsToRepresent(txn.executeAt.epoch() - txn.epoch())); + bitsPerExecuteAtHlcDelta = Math.max(bitsPerExecuteAtHlcDelta, numberOfBitsToRepresent(txn.executeAt.hlc() - txn.hlc())); + bitsPerExecuteAtFlags = Math.max(bitsPerExecuteAtFlags, numberOfBitsToRepresent(txn.executeAt.flags())); executeAtCount += 1; } nodeIdCount = compact(nodeIds); @@ -175,8 +154,8 @@ public class CommandsForKeySerializer int maxHeaderBits = minHeaderBits; int totalBytes = 0; - long prevEpoch = cfk.redundantBefore().epoch(); - long prevHlc = cfk.redundantBefore().hlc(); + long prevEpoch = cfk.get(0).epoch(); + long prevHlc = cfk.get(0).hlc(); int[] bytesHistogram = cachedInts().getInts(12); Arrays.fill(bytesHistogram, 0); for (int i = 0 ; i < commandCount ; ++i) @@ -214,7 +193,7 @@ public class CommandsForKeySerializer if (hasNonStandardFlags && txnIdFlags(txnId) == RAW) totalBytes += 2; - Info info = cfk.info(i); + TxnInfo info = cfk.get(i); if (info.status.hasInfo) headerBits += infoHeaderBits; maxHeaderBits = Math.max(headerBits, maxHeaderBits); @@ -247,9 +226,11 @@ public class CommandsForKeySerializer // then pick third number as 75th %ile, but at least 1 less than highest, and one more than second // finally, ensure third then second are distributed so that there is no more than a gap of 4 between them and the next int l0 = Math.max(0, Math.min(3, minBasicBytes - headerBytes)); - int l1 = Math.max(l0+1, Math.min(l0+4,Arrays.binarySearch(bytesHistogram, commandCount/4) - headerBytes)); + int l1 = Arrays.binarySearch(bytesHistogram, minBasicBytes, maxBasicBytes, commandCount/4); + l1 = Math.max(l0+1, Math.min(l0+4, (l1 < 0 ? -1 - l1 : l1) - headerBytes)); int l3 = Math.max(l1+2, maxBasicBytes - headerBytes); - int l2 = Math.max(l1+1, Math.min(l3-1, Arrays.binarySearch(bytesHistogram, (3*commandCount)/4) - headerBytes)); + int l2 = Arrays.binarySearch(bytesHistogram, minBasicBytes, maxBasicBytes,(3*commandCount)/4); + l2 = Math.max(l1+1, Math.min(l3-1, (l2 < 0 ? -1 -l2 : l2) - headerBytes)); while (l3-l2 > 4) ++l2; while (l2-l1 > 4) ++l1; hlcBytesLookup = setHlcBytes(l0, l1, l2, l3); @@ -267,16 +248,13 @@ public class CommandsForKeySerializer totalBytes += TypeSizes.sizeofUnsignedVInt(nodeIds[i] - nodeIds[i-1]); totalBytes += 2; - Arrays.fill(bytesHistogram, minBasicBytes, maxBasicBytes + 1, 0); cachedInts().forceDiscard(bytesHistogram); - prevEpoch = cfk.redundantBefore().epoch(); - prevHlc = cfk.redundantBefore().hlc(); + prevEpoch = cfk.get(0).epoch(); + prevHlc = cfk.get(0).hlc(); // account for encoding redundantBefore totalBytes += TypeSizes.sizeofUnsignedVInt(prevEpoch); totalBytes += TypeSizes.sizeofUnsignedVInt(prevHlc); - totalBytes += 2; // flags TODO (expected): pack this along with uniqueIdBits, as usually zero bits should be needed - totalBytes += (bitsPerNodeId+7)/8; if (missingIdCount + executeAtCount > 0) { @@ -291,6 +269,20 @@ public class CommandsForKeySerializer totalBytes += 2; } + // count unmanaged bytes + int unmanagedPendingCommitCount = 0; + for (int i = 0 ; i < cfk.unmanagedCount() ; ++i) + { + Unmanaged unmanaged = cfk.getUnmanaged(i); + if (unmanaged.pending == Unmanaged.Pending.COMMIT) + ++unmanagedPendingCommitCount; + totalBytes += CommandSerializers.txnId.serializedSize(); + // TODO (desired): this could be more efficient, e.g. referencing one of the TxnInfo indexes for timestamp + totalBytes += CommandSerializers.timestamp.serializedSize(); + } + totalBytes += TypeSizes.sizeofUnsignedVInt(unmanagedPendingCommitCount); + totalBytes += TypeSizes.sizeofUnsignedVInt(cfk.unmanagedCount() - unmanagedPendingCommitCount); + ByteBuffer out = ByteBuffer.allocate(totalBytes); VIntCoding.writeUnsignedVInt32(commandCount, out); VIntCoding.writeUnsignedVInt32(nodeIdCount, out); @@ -301,8 +293,6 @@ public class CommandsForKeySerializer VIntCoding.writeUnsignedVInt(prevEpoch, out); VIntCoding.writeUnsignedVInt(prevHlc, out); - out.putShort((short) cfk.redundantBefore().flags()); - writeLeastSignificantBytes(Arrays.binarySearch(nodeIds, 0, nodeIdCount, cfk.redundantBefore().node.id), (bitsPerNodeId+7)/8, out); int executeAtMask = executeAtCount > 0 ? 1 : 0; int missingDepsMask = missingIdCount > 0 ? 1 : 0; @@ -311,7 +301,7 @@ public class CommandsForKeySerializer for (int i = 0 ; i < commandCount ; ++i) { TxnId txnId = cfk.txnId(i); - Info info = cfk.info(i); + TxnInfo info = cfk.get(i); InternalStatus status = info.status; long bits = status.ordinal(); @@ -322,7 +312,7 @@ public class CommandsForKeySerializer bits |= hasExecuteAt << bitIndex; bitIndex += statusHasInfo & executeAtMask; - long hasMissingIds = info.missing != CommandsForKey.NO_TXNIDS ? 1 : 0; + long hasMissingIds = info.missing() != CommandsForKey.NO_TXNIDS ? 1 : 0; bits |= hasMissingIds << bitIndex; bitIndex += statusHasInfo & missingDepsMask; @@ -392,6 +382,20 @@ public class CommandsForKeySerializer } } + VIntCoding.writeUnsignedVInt32(unmanagedPendingCommitCount, out); + VIntCoding.writeUnsignedVInt32(cfk.unmanagedCount() - unmanagedPendingCommitCount, out); + Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ? Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT; + for (int i = 0 ; i < cfk.unmanagedCount() ; ++i) + { + Unmanaged unmanaged = cfk.getUnmanaged(i); + Invariants.checkState(unmanaged.pending == pending); + CommandSerializers.txnId.serialize(unmanaged.txnId, out, ByteBufferAccessor.instance, out.position()); + out.position(out.position() + CommandSerializers.txnId.serializedSize()); + CommandSerializers.timestamp.serialize(unmanaged.waitingUntil, out, ByteBufferAccessor.instance, out.position()); + out.position(out.position() + CommandSerializers.timestamp.serializedSize()); + if (--unmanagedPendingCommitCount == 0) pending = Unmanaged.Pending.APPLY; + } + if ((executeAtCount | missingIdCount) > 0) { int bitsPerCommandId = numberOfBitsToRepresent(commandCount); @@ -407,21 +411,17 @@ public class CommandsForKeySerializer for (int i = 0 ; i < commandCount ; ++i) { - Info info = cfk.info(i); - if (info.getClass() == NoInfo.class) - continue; - - TxnId txnId = cfk.txnId(i); - if (info.executeAt != txnId) + TxnInfo txn = cfk.get(i); + if (txn.executeAt != txn) { - Timestamp executeAt = info.executeAt; + Timestamp executeAt = txn.executeAt; int nodeIdx = Arrays.binarySearch(nodeIds, 0, nodeIdCount, executeAt.node.id); if (bitsPerExecuteAt <= 64) { - Invariants.checkState(executeAt.epoch() >= txnId.epoch()); - long executeAtBits = executeAt.epoch() - txnId.epoch(); + Invariants.checkState(executeAt.epoch() >= txn.epoch()); + long executeAtBits = executeAt.epoch() - txn.epoch(); int offset = bitsPerExecuteAtEpochDelta; - executeAtBits |= (executeAt.hlc() - txnId.hlc()) << offset ; + executeAtBits |= (executeAt.hlc() - txn.hlc()) << offset ; offset += bitsPerExecuteAtHlcDelta; executeAtBits |= ((long)executeAt.flags()) << offset; offset += bitsPerExecuteAtFlags; @@ -431,9 +431,9 @@ public class CommandsForKeySerializer } else { - buffer = flushBits(buffer, bufferCount, executeAt.epoch() - txnId.epoch(), bitsPerExecuteAtEpochDelta, out); + buffer = flushBits(buffer, bufferCount, executeAt.epoch() - txn.epoch(), bitsPerExecuteAtEpochDelta, out); bufferCount = (bufferCount + bitsPerExecuteAtEpochDelta) & 63; - buffer = flushBits(buffer, bufferCount, executeAt.hlc() - txnId.hlc(), bitsPerExecuteAtHlcDelta, out); + buffer = flushBits(buffer, bufferCount, executeAt.hlc() - txn.hlc(), bitsPerExecuteAtHlcDelta, out); bufferCount = (bufferCount + bitsPerExecuteAtHlcDelta) & 63; buffer = flushBits(buffer, bufferCount, executeAt.flags(), bitsPerExecuteAtFlags, out); bufferCount = (bufferCount + bitsPerExecuteAtFlags) & 63; @@ -442,16 +442,17 @@ public class CommandsForKeySerializer } } - if (info.missing.length > 0) + TxnId[] missing = txn.missing(); + if (missing.length > 0) { int j = 0; - while (j < info.missing.length - 1) + while (j < missing.length - 1) { - int missingId = cfk.indexOf(info.missing[j++]); + int missingId = cfk.indexOf(missing[j++]); buffer = flushBits(buffer, bufferCount, missingId, bitsPerMissingId, out); bufferCount = (bufferCount + bitsPerMissingId) & 63; } - int missingId = cfk.indexOf(info.missing[info.missing.length - 1]); + int missingId = cfk.indexOf(missing[missing.length - 1]); missingId |= 1L << bitsPerCommandId; buffer = flushBits(buffer, bufferCount, missingId, bitsPerMissingId, out); bufferCount = (bufferCount + bitsPerMissingId) & 63; @@ -461,6 +462,7 @@ public class CommandsForKeySerializer writeMostSignificantBytes(buffer, (bufferCount + 7)/8, out); } + Invariants.checkState(!out.hasRemaining()); out.flip(); return out; } @@ -494,16 +496,10 @@ public class CommandsForKeySerializer in = in.duplicate(); int commandCount = VIntCoding.readUnsignedVInt32(in); if (commandCount == 0) - { - long epoch = VIntCoding.readUnsignedVInt(in); - long hlc = VIntCoding.readUnsignedVInt(in); - int flags = VIntCoding.readUnsignedVInt32(in); - Node.Id id = new Node.Id(VIntCoding.readUnsignedVInt32(in)); - return new CommandsForKey(key).withoutRedundant(TxnId.fromValues(epoch, hlc, flags, id)); - } + return new CommandsForKey(key); - TxnId[] txnIds = new TxnId[commandCount]; - Info[] infos = new Info[commandCount]; + TxnId[] txnIds = cachedTxnIds().get(commandCount); + TxnInfo[] txns = new TxnInfo[commandCount]; int nodeIdCount = VIntCoding.readUnsignedVInt32(in); int bitsPerNodeId = numberOfBitsToRepresent(nodeIdCount); long nodeIdMask = (1L << bitsPerNodeId) - 1; @@ -526,12 +522,8 @@ public class CommandsForKeySerializer hlcBytesLookup = setHlcByteDeltas((flags >>> 5) & 0x3, (flags >>> 7) & 0x3, (flags >>> 9) & 0x3, (flags >>> 11) & 0x3); } - long prevEpoch = VIntCoding.readUnsignedVInt32(in); - long prevHlc = VIntCoding.readUnsignedVInt32(in); - TxnId redundantBefore = TxnId.fromValues(prevEpoch, prevHlc, in.getShort(), - nodeIds[(int)readLeastSignificantBytes((bitsPerNodeId+7)/8, in)]); - - + long prevEpoch = VIntCoding.readUnsignedVInt(in); + long prevHlc = VIntCoding.readUnsignedVInt(in); for (int i = 0 ; i < commandCount ; ++i) { long header = readLeastSignificantBytes(headerByteCount, in); @@ -601,12 +593,34 @@ public class CommandsForKeySerializer : TxnId.fromValues(epoch, hlc, flags, node); txnIds[i] = txnId; - infos[i] = DECODE_INFOS[(executeAtInfoOffset | missingDepsInfoOffset)*STATUS_COUNT + status.ordinal()]; + txns[i] = DECODE_INFOS[(executeAtInfoOffset | missingDepsInfoOffset)*STATUS_COUNT + status.ordinal()]; prevEpoch = epoch; prevHlc = hlc; } + int unmanagedPendingCommitCount = VIntCoding.readUnsignedVInt32(in); + int unmanagedCount = unmanagedPendingCommitCount + VIntCoding.readUnsignedVInt32(in); + Unmanaged[] unmanageds; + if (unmanagedCount == 0) + { + unmanageds = NO_PENDING_UNMANAGED; + } + else + { + unmanageds = new Unmanaged[unmanagedCount]; + Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ? Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT; + for (int i = 0 ; i < unmanagedCount ; ++i) + { + TxnId txnId = CommandSerializers.txnId.deserialize(in, ByteBufferAccessor.instance, in.position()); + in.position(in.position() + CommandSerializers.txnId.serializedSize()); + Timestamp waitingUntil = CommandSerializers.timestamp.deserialize(in, ByteBufferAccessor.instance, in.position()); + in.position(in.position() + CommandSerializers.timestamp.serializedSize()); + unmanageds[i] = new Unmanaged(pending, txnId, waitingUntil); + if (--unmanagedPendingCommitCount == 0) pending = Unmanaged.Pending.APPLY; + } + } + if (executeAtMasks + missingDepsMasks > 0) { TxnId[] missingIdBuffer = cachedTxnIds().get(8); @@ -630,13 +644,10 @@ public class CommandsForKeySerializer for (int i = 0 ; i < commandCount ; ++i) { - Info info = infos[i]; - if (info.getClass() == NoInfo.class) - continue; - TxnId txnId = txnIds[i]; - Timestamp executeAt = txnId; - if (info.executeAt == null) + TxnInfo placeholder = txns[i]; + Timestamp executeAt; + if (placeholder.executeAt == null) { long epoch, hlc; int flags; @@ -661,8 +672,12 @@ public class CommandsForKeySerializer } executeAt = Timestamp.fromValues(epoch, hlc, flags, id); } + else + { + executeAt = txnId; + } - TxnId[] missing = info.missing; + TxnId[] missing = placeholder.missing(); if (missing == null) { int prev = -1; @@ -684,13 +699,19 @@ public class CommandsForKeySerializer missingIdCount = 0; } - infos[i] = Info.create(txnId, info.status, executeAt, missing); + txns[i] = TxnInfo.create(txnId, placeholder.status, executeAt, missing); } cachedTxnIds().forceDiscard(missingIdBuffer, maxIdBufferCount); } + else + { + for (int i = 0 ; i < commandCount ; ++i) + txns[i] = TxnInfo.create(txnIds[i], txns[i].status, txnIds[i]); + } + cachedTxnIds().forceDiscard(txnIds, commandCount); - return CommandsForKey.SerializerSupport.create(key, redundantBefore, txnIds, infos); + return CommandsForKey.SerializerSupport.create(key, txns, unmanageds); } private static int getHlcBytes(int lookup, int index) @@ -833,16 +854,16 @@ public class CommandsForKeySerializer private static final Txn.Kind[] TXN_ID_FLAG_BITS_KIND_LOOKUP = new Txn.Kind[] { Read, Write, ExclusiveSyncPoint, null }; private static final int STATUS_COUNT = InternalStatus.values().length; - private static final Info[] DECODE_INFOS = new Info[4 * STATUS_COUNT]; + private static final TxnInfo[] DECODE_INFOS = new TxnInfo[4 * STATUS_COUNT]; static { for (InternalStatus status : InternalStatus.values()) { int ordinal = status.ordinal(); - DECODE_INFOS[ordinal] = status.asNoInfo; - DECODE_INFOS[STATUS_COUNT+ordinal] = Info.createMock(status, Timestamp.NONE, null); - DECODE_INFOS[2*STATUS_COUNT+ordinal] = Info.createMock(status, null, CommandsForKey.NO_TXNIDS); - DECODE_INFOS[3*STATUS_COUNT+ordinal] = Info.createMock(status, null, null); + DECODE_INFOS[ordinal] = TxnInfo.createMock(TxnId.NONE, status, TxnId.NONE, CommandsForKey.NO_TXNIDS); + DECODE_INFOS[STATUS_COUNT+ordinal] = TxnInfo.createMock(TxnId.NONE, status, TxnId.NONE, null); + DECODE_INFOS[2*STATUS_COUNT+ordinal] = TxnInfo.createMock(TxnId.NONE, status, null, CommandsForKey.NO_TXNIDS); + DECODE_INFOS[3*STATUS_COUNT+ordinal] = TxnInfo.createMock(TxnId.NONE, status, null, null); } } diff --git a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java index 930807d7f0..3efb9e2c6c 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java @@ -22,40 +22,63 @@ import java.io.IOException; import java.nio.ByteBuffer; import accord.local.Command.WaitingOn; -import accord.primitives.Deps; +import accord.primitives.Keys; +import accord.primitives.Routable; +import accord.primitives.TxnId; import accord.utils.ImmutableBitSet; import accord.utils.Invariants; import accord.utils.SimpleBitSet; +import accord.utils.SortedArrays.SortedArrayList; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.vint.VIntCoding; public class WaitingOnSerializer { - public static void serialize(WaitingOn waitingOn, DataOutputPlus out) throws IOException + public static void serialize(TxnId txnId, WaitingOn waitingOn, DataOutputPlus out) throws IOException { - // TODO (expected): use run length encoding; we know that at most 1/3rd of bits will be set between the three bitsets - int length = (waitingOn.deps.txnIdCount() + 63) / 64; - serialize(length, waitingOn.waitingOnCommit, out); - serialize(length, waitingOn.waitingOnApply, out); - serialize(length, waitingOn.appliedOrInvalidated, out); + out.writeUnsignedVInt32(waitingOn.keys.size()); + out.writeUnsignedVInt32(waitingOn.txnIds.size()); + int keyCount = waitingOn.keys.size(); + int txnIdCount = waitingOn.txnIds.size(); + int waitingOnLength = (txnIdCount + keyCount + 63) / 64; + serialize(waitingOnLength, waitingOn.waitingOn, out); + if (txnId.domain() == Routable.Domain.Range) + { + int appliedOrInvalidatedLength = (txnIdCount + 63) / 64; + serialize(appliedOrInvalidatedLength, waitingOn.appliedOrInvalidated, out); + } } - public static WaitingOn deserialize(Deps deps, DataInputPlus in) throws IOException + public static WaitingOn deserialize(TxnId txnId, Keys keys, SortedArrayList<TxnId> txnIds, DataInputPlus in) throws IOException { - int length = (deps.txnIdCount() + 63) / 64; - ImmutableBitSet waitingOnCommit = deserialize(length, in); - ImmutableBitSet waitingOnApply = deserialize(length, in); - ImmutableBitSet appliedOrInvalidated = deserialize(length, in); - return new WaitingOn(deps, waitingOnCommit, waitingOnApply, appliedOrInvalidated); + int a = in.readUnsignedVInt32(); + int b = in.readUnsignedVInt32(); + int waitingOnLength = (txnIds.size() + keys.size() + 63) / 64; + ImmutableBitSet waitingOn = deserialize(waitingOnLength, in); + ImmutableBitSet appliedOrInvalidated = null; + if (txnId.domain() == Routable.Domain.Range) + { + int appliedOrInvalidatedLength = (txnIds.size() + 63) / 64; + appliedOrInvalidated = deserialize(appliedOrInvalidatedLength, in); + } + return new WaitingOn(keys, txnIds, waitingOn, appliedOrInvalidated); } public static long serializedSize(WaitingOn waitingOn) { - int length = (waitingOn.deps.txnIdCount() + 63) / 64; - return serializedSize(length, waitingOn.waitingOnCommit) - + serializedSize(length, waitingOn.waitingOnApply) - + serializedSize(length, waitingOn.appliedOrInvalidated); + int keyCount = waitingOn.keys.size(); + int txnIdCount = waitingOn.txnIds.size(); + int waitingOnLength = (txnIdCount + keyCount + 63) / 64; + long size = serializedSize(waitingOnLength, waitingOn.waitingOn); + size += TypeSizes.sizeofUnsignedVInt(keyCount); + size += TypeSizes.sizeofUnsignedVInt(txnIdCount); + if (waitingOn.appliedOrInvalidated == null) + return size; + + int appliedOrInvalidatedLength = (txnIdCount + 63) / 64; + return size + serializedSize(appliedOrInvalidatedLength, waitingOn.appliedOrInvalidated); } private static void serialize(int length, SimpleBitSet write, DataOutputPlus out) throws IOException @@ -81,14 +104,23 @@ public class WaitingOnSerializer return (long) TypeSizes.LONG_SIZE * length; } - public static ByteBuffer serialize(WaitingOn waitingOn) throws IOException + public static ByteBuffer serialize(TxnId txnId, WaitingOn waitingOn) throws IOException { - int length = (waitingOn.deps.txnIdCount() + 63) / 64; - ByteBuffer out = ByteBuffer.allocate(TypeSizes.LONG_SIZE * length * 3); - serialize(length, waitingOn.waitingOnCommit, out); - serialize(length, waitingOn.waitingOnApply, out); - serialize(length, waitingOn.appliedOrInvalidated, out); - return (ByteBuffer) out.flip(); + int keyCount = waitingOn.keys.size(); + int txnIdCount = waitingOn.txnIds.size(); + int waitingOnLength = (txnIdCount + keyCount + 63) / 64; + int appliedOrInvalidatedLength = 0; + if (txnId.domain() == Routable.Domain.Range) + appliedOrInvalidatedLength = (txnIdCount + 63) / 64; + + ByteBuffer out = ByteBuffer.allocate(TypeSizes.sizeofUnsignedVInt(keyCount) + TypeSizes.sizeofUnsignedVInt(txnIdCount) + + TypeSizes.LONG_SIZE * (waitingOnLength + appliedOrInvalidatedLength)); + VIntCoding.writeUnsignedVInt32(keyCount, out); + VIntCoding.writeUnsignedVInt32(txnIdCount, out); + serialize(waitingOnLength, waitingOn.waitingOn, out); + if (appliedOrInvalidatedLength > 0) + serialize(appliedOrInvalidatedLength, waitingOn.appliedOrInvalidated, out); + return out.flip(); } private static void serialize(int length, SimpleBitSet write, ByteBuffer out) @@ -99,16 +131,23 @@ public class WaitingOnSerializer out.putLong(bits[i]); } - public static WaitingOn deserialize(Deps deps, ByteBuffer in) throws IOException + public static WaitingOn deserialize(TxnId txnId, Keys keys, SortedArrayList<TxnId> txnIds, ByteBuffer in) throws IOException { - int length = (deps.txnIdCount() + 63) / 64; + int waitingOnLength = (txnIds.size() + keys.size() + 63) / 64; int position = in.position(); - ImmutableBitSet waitingOnCommit = deserialize(position, length, in); - position += length*8; - ImmutableBitSet waitingOnApply = deserialize(position, length, in); - position += length*8; - ImmutableBitSet appliedOrInvalidated = deserialize(position, length, in); - return new WaitingOn(deps, waitingOnCommit, waitingOnApply, appliedOrInvalidated); + int a = VIntCoding.readUnsignedVInt32(in, position); + position += TypeSizes.sizeofUnsignedVInt(a); + int b = VIntCoding.readUnsignedVInt32(in, position); + position += TypeSizes.sizeofUnsignedVInt(a); + ImmutableBitSet waitingOn = deserialize(position, waitingOnLength, in); + ImmutableBitSet appliedOrInvalidated = null; + if (txnId.domain() == Routable.Domain.Range) + { + position += waitingOnLength*8; + int appliedOrInvalidatedLength = (txnIds.size() + 63) / 64; + appliedOrInvalidated = deserialize(position, appliedOrInvalidatedLength, in); + } + return new WaitingOn(keys, txnIds, waitingOn, appliedOrInvalidated); } private static ImmutableBitSet deserialize(int position, int length, ByteBuffer in) diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java index ec85d63f2c..5e3e9ff831 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java @@ -35,9 +35,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.api.DataStore; +import accord.api.Key; import accord.api.Write; import accord.impl.AbstractSafeCommandStore; -import accord.impl.CommandsForKeys; +import accord.impl.TimestampsForKeys; import accord.impl.TimestampsForKey; import accord.local.SafeCommandStore; import accord.primitives.PartialTxn; @@ -375,7 +376,7 @@ public class TxnWrite extends AbstractKeySorted<TxnWrite.Update> implements Writ // TODO (expected, efficiency): 99.9999% of the time we can just use executeAt.hlc(), so can avoid bringing // cfk into memory by retaining at all times in memory key ranges that are dirty and must use this logic; // any that aren't can just use executeAt.hlc - TimestampsForKey cfk = CommandsForKeys.updateLastExecutionTimestamps((AbstractSafeCommandStore<?,?,?>) safeStore, (RoutableKey) key, executeAt, true); + TimestampsForKey cfk = TimestampsForKeys.updateLastExecutionTimestamps((AbstractSafeCommandStore<?,?,?>) safeStore, (Key) key, executeAt, true); long timestamp = AccordSafeTimestampsForKey.timestampMicrosFor(cfk, executeAt, true); // TODO (low priority - do we need to compute nowInSeconds, or can we just use executeAt?) int nowInSeconds = AccordSafeTimestampsForKey.nowInSecondsFor(cfk, executeAt, true); diff --git a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java index 8b52bb41e4..dc873f0210 100644 --- a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java +++ b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java @@ -126,13 +126,27 @@ public class VIntCoding return retval; } + @DontInline + private static long readUnsignedVIntSlow(ByteBuffer in, int position, byte firstByte) + { + int size = numberOfExtraBytesToRead(firstByte); + long retval = firstByte & firstByteValueMask(size); + for (int ii = 0; ii < size; ii++) + { + byte b = in.get(position++); + retval <<= 8; + retval |= b & 0xff; + } + + return retval; + } + public static long readUnsignedVInt(ByteBuffer in) { byte firstByte = in.get(); if (firstByte >= 0) return firstByte; - int position = in.position(); int limit = in.limit(); if (limit - position < 8) @@ -155,6 +169,32 @@ public class VIntCoding return retval; } + public static long readUnsignedVInt(ByteBuffer in, int position) + { + byte firstByte = in.get(position++); + if (firstByte >= 0) + return firstByte; + + int limit = in.limit(); + if (limit - position < 8) + return readUnsignedVIntSlow(in, position, firstByte); + + int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte); + int extraBits = extraBytes * 8; + + long retval = in.getLong(position); + if (in.order() == ByteOrder.LITTLE_ENDIAN) + retval = Long.reverseBytes(retval); + + // truncate the bytes we read in excess of those we needed + retval >>>= 64 - extraBits; + // remove the non-value bits from the first byte + firstByte &= VIntCoding.firstByteValueMask(extraBytes); + // shift the first byte up to its correct position + retval |= (long) firstByte << extraBits; + return retval; + } + public static void skipUnsignedVInt(DataInputPlus input) throws IOException { int firstByte = input.readByte(); @@ -330,6 +370,11 @@ public class VIntCoding return checkedCast(readUnsignedVInt(input)); } + public static int readUnsignedVInt32(ByteBuffer input, int position) + { + return checkedCast(readUnsignedVInt(input, position)); + } + // & this with the first byte to give the value part for a given extraBytesToRead encoded in the byte public static int firstByteValueMask(int extraBytesToRead) { diff --git a/test/conf/logback-dtest-quiet.xml b/test/conf/logback-dtest-quiet.xml new file mode 100644 index 0000000000..bb9f983177 --- /dev/null +++ b/test/conf/logback-dtest-quiet.xml @@ -0,0 +1,56 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<configuration debug="false" scan="true" scanPeriod="60 seconds"> + <define name="cluster_id" class="org.apache.cassandra.distributed.impl.ClusterIDDefiner" /> + <define name="instance_id" class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" /> + + <!-- Shutdown hook ensures that async appender flushes --> + <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/> + + <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender"> + <file>./build/test/logs/${cassandra.testtag}/${suitename}/${cluster_id}/${instance_id}/system.log</file> + <encoder> + <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>INFO</level> + </filter> + <immediateFlush>true</immediateFlush> + </appender> + + <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>ERROR</level> + </filter> + </appender> + + <!-- Un-comment to enable TRACE logging for Accord transactions. + <logger name="accord" level="TRACE" /> + <logger name="org.apache.cassandra.service.accord" level="TRACE" /> + --> + + <root level="DEBUG"> + <appender-ref ref="INSTANCEFILE" /> <!-- use blocking to avoid race conditions with appending and searching --> + <appender-ref ref="INSTANCESTDERR" /> + </root> +</configuration> diff --git a/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java b/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java index aee6aeaeb8..5b27d3d44f 100644 --- a/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java +++ b/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java @@ -21,6 +21,7 @@ package org.apache.cassandra.distributed.api; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.Future; +import java.util.function.BiConsumer; import org.apache.cassandra.distributed.shared.FutureUtils; @@ -60,6 +61,8 @@ public interface ICoordinator } SimpleQueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues); + Future<?> executeWithResult(BiConsumer<SimpleQueryResult, Throwable> callback, String query, ConsistencyLevel consistencyLevel, Object... boundValues); + Future<?> executeWithResult(BiConsumer<SimpleQueryResult, Throwable> callback, String query, ConsistencyLevel serialConsistencyLevel, ConsistencyLevel commitConsistencyLevel, Object... boundValues); default SimpleQueryResult executeWithResult(String query, ConsistencyLevel serialConsistencyLevel, ConsistencyLevel commitConsistencyLevel, Object... boundValues) { diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java index 19d80626d3..e89ab8faa3 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.UUID; import java.util.concurrent.Future; +import java.util.function.BiConsumer; import com.google.common.collect.Iterators; @@ -66,6 +67,30 @@ public class Coordinator implements ICoordinator return instance().sync(() -> unsafeExecuteInternal(query, consistencyLevel, boundValues)).call(); } + @Override + public Future<?> executeWithResult(BiConsumer<SimpleQueryResult, Throwable> callback, String query, ConsistencyLevel consistencyLevel, Object... boundValues) + { + return executeWithResult(callback, query, null, consistencyLevel, boundValues); + } + + @Override + public Future<?> executeWithResult(BiConsumer<SimpleQueryResult, Throwable> callback, String query, ConsistencyLevel serialConsistencyLevel, ConsistencyLevel commitConsistencyLevel, Object... boundValues) + { + return instance().async(cb -> { + SimpleQueryResult result; + try + { + result = unsafeExecuteInternal(query, serialConsistencyLevel, commitConsistencyLevel, boundValues); + } + catch (Throwable t) + { + callback.accept(null, t); + return; + } + callback.accept(result, null); + }).apply(callback); + } + public Future<SimpleQueryResult> asyncExecuteWithTracingWithResult(UUID sessionId, String query, ConsistencyLevel consistencyLevelOrigin, Object... boundValues) { return instance.async(() -> { diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index e2036a2382..464bc3b8bf 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -492,7 +492,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance @Override public void receiveMessage(IMessage message) { - sync(receiveMessageRunnable(message)).accept(false); + async(receiveMessageRunnable(message)).apply(false); } @Override diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java new file mode 100644 index 0000000000..42e7fbf34a --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.accord; + +import java.io.IOException; +import java.util.Date; +import java.util.Random; +import java.util.concurrent.Semaphore; + +import com.google.common.util.concurrent.RateLimiter; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICoordinator; +import org.apache.cassandra.utils.EstimatedHistogram; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +public class AccordLoadTest extends AccordTestBase +{ + private static final Logger logger = LoggerFactory.getLogger(AccordLoadTest.class); + + @BeforeClass + public static void setUp() throws IOException + { + AccordTestBase.setupCluster(builder -> builder.withConfig(config -> config.set("lwt_strategy", "accord").set("non_serial_write_strategy", "accord")), 2); + } + + @Ignore + @Test + public void testLoad() throws Exception + { + test("CREATE TABLE " + qualifiedTableName + " (k int, v int, PRIMARY KEY(k))", + cluster -> { + ICoordinator coordinator = cluster.coordinator(1); + final int batchSize = 1000; + final int concurrency = 100; + final int ratePerSecond = 1000; + final int keyCount = 10; + for (int i = 1; i <= keyCount; i++) + coordinator.execute("INSERT INTO " + qualifiedTableName + " (k, v) VALUES (0, 0) USING TIMESTAMP 0;", ConsistencyLevel.ALL, i); + + Random random = new Random(); +// CopyOnWriteArrayList<Throwable> exceptions = new CopyOnWriteArrayList<>(); + final Semaphore inFlight = new Semaphore(concurrency); + final RateLimiter rateLimiter = RateLimiter.create(ratePerSecond); + long testStart = System.nanoTime(); +// while (NANOSECONDS.toMinutes(System.nanoTime() - testStart) < 10 && exceptions.size() < 10000) + while (true) + { + final EstimatedHistogram histogram = new EstimatedHistogram(200); + long batchStart = System.nanoTime(); + for (int i = 0 ; i < batchSize ; ++i) + { + inFlight.acquire(); + rateLimiter.acquire(); + long commandStart = System.nanoTime(); + coordinator.executeWithResult((success, fail) -> { + inFlight.release(); + if (fail == null) histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart)); +// else exceptions.add(fail); + }, "UPDATE " + qualifiedTableName + " SET v += 1 WHERE k = ? IF EXISTS;", ConsistencyLevel.SERIAL, ConsistencyLevel.QUORUM, random.nextInt(keyCount)); + } + System.out.printf("%tT rate: %.2f/s\n", new Date(), (((float)batchSize * 1000) / NANOSECONDS.toMillis(System.nanoTime() - batchStart))); + System.out.printf("%tT percentiles: %d %d %d %d\n", new Date(), histogram.percentile(.25)/1000, histogram.percentile(.5)/1000, histogram.percentile(.75)/1000, histogram.percentile(1)/1000); + } + } + ); + } + + @Override + protected Logger logger() + { + return logger; + } +} diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java index 95c495a350..ee38ffceb9 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java @@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory; import accord.api.Key; import accord.api.Result; -import accord.impl.CommandsForKey; +import accord.local.CommandsForKey; import accord.local.CheckedCommands; import accord.local.Command; import accord.local.CommandStore; diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java index af4e3f738b..fdaf519874 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java @@ -31,8 +31,8 @@ import org.slf4j.LoggerFactory; import accord.api.Key; import accord.api.Result; -import accord.impl.CommandsForKey; -import accord.impl.CommandsForKeys; +import accord.local.CommandsForKey; +import accord.impl.TimestampsForKeys; import accord.impl.TimestampsForKey; import accord.local.Command; import accord.local.CommonAttributes; @@ -41,10 +41,13 @@ import accord.messages.Apply; import accord.primitives.Ballot; import accord.primitives.PartialDeps; import accord.primitives.PartialTxn; +import accord.primitives.Range; import accord.primitives.Ranges; +import accord.primitives.Routable; import accord.primitives.Route; import accord.primitives.RoutingKeys; import accord.primitives.Timestamp; +import accord.primitives.Txn; import accord.primitives.TxnId; import accord.primitives.Writes; import accord.utils.ImmutableBitSet; @@ -102,20 +105,20 @@ public class AccordCommandStoreTest AtomicLong clock = new AtomicLong(0); PartialTxn depTxn = createPartialTxn(0); Key key = (Key)depTxn.keys().get(0); + Range range = key.toUnseekable().asRange(); AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, c, v) VALUES (0, 0, 1)"); TableId tableId = Schema.instance.getTableMetadata("ks", "tbl").id; - TxnId oldTxnId1 = txnId(1, clock.incrementAndGet(), 1); - TxnId oldTxnId2 = txnId(1, clock.incrementAndGet(), 1); - TxnId oldTimestamp = txnId(1, clock.incrementAndGet(), 1); - TxnId txnId = txnId(1, clock.incrementAndGet(), 1); + TxnId oldTxnId1 = txnId(1, clock.incrementAndGet(), 1, Txn.Kind.Write, Routable.Domain.Range); + TxnId oldTxnId2 = txnId(1, clock.incrementAndGet(), 1, Txn.Kind.Write, Routable.Domain.Range); + TxnId txnId = txnId(1, clock.incrementAndGet(), 1, Txn.Kind.Write, Routable.Domain.Range); PartialDeps dependencies; try (PartialDeps.Builder builder = PartialDeps.builder(depTxn.covering())) { - builder.add(key, oldTxnId1); - builder.add(key, oldTxnId2); + builder.add(range, oldTxnId1); + builder.add(range, oldTxnId2); dependencies = builder.build(); } @@ -130,11 +133,9 @@ public class AccordCommandStoreTest Ballot accepted = ballot(1, clock.incrementAndGet(), 1); Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 1); attrs.partialDeps(dependencies); - SimpleBitSet waitingOnCommit = new SimpleBitSet(2); - waitingOnCommit.set(0); - SimpleBitSet waitingOnApply = new SimpleBitSet(2); + SimpleBitSet waitingOnApply = new SimpleBitSet(3); waitingOnApply.set(1); - Command.WaitingOn waitingOn = new Command.WaitingOn(dependencies, new ImmutableBitSet(waitingOnCommit), new ImmutableBitSet(waitingOnApply), new ImmutableBitSet(2)); + Command.WaitingOn waitingOn = new Command.WaitingOn(dependencies.keyDeps.keys(), dependencies.rangeDeps.txnIds(), new ImmutableBitSet(waitingOnApply), new ImmutableBitSet(2)); attrs.addListener(new Command.ProxyListener(oldTxnId1)); Pair<Writes, Result> result = AccordTestUtils.processTxnResult(commandStore, txnId, txn, executeAt); @@ -183,10 +184,10 @@ public class AccordCommandStoreTest AccordSafeTimestampsForKey tfk = new AccordSafeTimestampsForKey(loaded(key, null)); tfk.initialize(); - CommandsForKeys.updateLastExecutionTimestamps(commandStore, tfk, txnId1, true); + TimestampsForKeys.updateLastExecutionTimestamps(commandStore, tfk, txnId1, true); Assert.assertEquals(txnId1.hlc(), AccordSafeTimestampsForKey.timestampMicrosFor(tfk.current(), txnId1, true)); - CommandsForKeys.updateLastExecutionTimestamps(commandStore, tfk, txnId2, true); + TimestampsForKeys.updateLastExecutionTimestamps(commandStore, tfk, txnId2, true); Assert.assertEquals(txnId2.hlc(), AccordSafeTimestampsForKey.timestampMicrosFor(tfk.current(), txnId2, true)); Assert.assertEquals(txnId2, tfk.current().lastExecutedTimestamp()); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java index 5b1a1402a2..71e821d329 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java @@ -26,7 +26,7 @@ import org.junit.Test; import accord.api.Key; import accord.api.RoutingKey; -import accord.impl.CommandsForKey; +import accord.local.CommandsForKey; import accord.local.Command; import accord.local.KeyHistory; import accord.local.Node; @@ -117,7 +117,7 @@ public class AccordCommandTest Assert.assertEquals(Status.PreAccepted, command.status()); Assert.assertTrue(command.partialDeps() == null || command.partialDeps().isEmpty()); - CommandsForKey cfk = ((AccordSafeCommandStore) instance).commandsForKey(key(1)).current(); + CommandsForKey cfk = ((AccordSafeCommandStore) instance).get(key(1)).current(); Assert.assertTrue(cfk.indexOf(txnId) >= 0); })); @@ -145,7 +145,7 @@ public class AccordCommandTest Assert.assertEquals(Status.Accepted, command.status()); Assert.assertEquals(deps, command.partialDeps()); - CommandsForKey cfk = ((AccordSafeCommandStore) instance).commandsForKey(key(1)).current(); + CommandsForKey cfk = ((AccordSafeCommandStore) instance).get(key(1)).current(); Assert.assertTrue(cfk.indexOf(txnId) >= 0); })); @@ -160,7 +160,7 @@ public class AccordCommandTest Assert.assertTrue(command.hasBeen(Status.Committed)); Assert.assertEquals(commit.partialDeps, command.partialDeps()); - CommandsForKey cfk = ((AccordSafeCommandStore) instance).commandsForKey(key(1)).current(); + CommandsForKey cfk = ((AccordSafeCommandStore) instance).get(key(1)).current(); Assert.assertTrue(cfk.indexOf(txnId) >= 0); })); } diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java index 66d0c8e436..0ae2a6abc8 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java @@ -30,6 +30,8 @@ import java.util.function.LongSupplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import javax.annotation.Nullable; + import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.junit.Assert; @@ -59,6 +61,7 @@ import accord.primitives.PartialDeps; import accord.primitives.PartialTxn; import accord.primitives.Participants; import accord.primitives.Ranges; +import accord.primitives.Routable; import accord.primitives.Route; import accord.primitives.Seekable; import accord.primitives.Seekables; @@ -199,8 +202,8 @@ public class AccordTestUtils @Override public void executed(Command command, ProgressShard progressShard) {} @Override public void clear(TxnId txnId) {} @Override public void durable(Command command) {} - @Override - public void waiting(SafeCommand blockedBy, LocalExecution blockedUntil, Route<?> blockedOnRoute, Participants<?> blockedOnParticipants) {} + @Override public void waiting(SafeCommand blockedBy, LocalExecution blockedUntil, Route<?> blockedOnRoute, Participants<?> blockedOnParticipants) {} + @Override public void waiting(TxnId blockedBy, LocalExecution blockedUntil, @Nullable Route<?> blockedOnRoute, @Nullable Participants<?> blockedOnParticipants) {} }; public static TxnId txnId(long epoch, long hlc, int node) @@ -213,6 +216,11 @@ public class AccordTestUtils return new TxnId(epoch, hlc, kind, Key, new Node.Id(node)); } + public static TxnId txnId(long epoch, long hlc, int node, Txn.Kind kind, Routable.Domain domain) + { + return new TxnId(epoch, hlc, kind, domain, new Node.Id(node)); + } + public static Timestamp timestamp(long epoch, long hlc, int node) { return Timestamp.fromValues(epoch, hlc, new Node.Id(node)); diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java index 8c952f02c0..33f0dc2458 100644 --- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java +++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java @@ -32,13 +32,12 @@ import org.junit.BeforeClass; import org.junit.Test; import accord.api.Key; -import accord.impl.CommandsForKey; +import accord.local.CommandsForKey; import accord.impl.TimestampsForKey; import accord.local.Command; import accord.local.KeyHistory; import accord.primitives.Keys; import accord.primitives.PartialTxn; -import accord.primitives.RoutableKey; import accord.primitives.TxnId; import accord.utils.async.AsyncChains; import accord.utils.async.AsyncResult; @@ -96,7 +95,7 @@ public class AsyncLoaderTest AccordStateCache.Instance<TxnId, Command, AccordSafeCommand> commandCache = commandStore.commandCache(); commandStore.executeBlocking(() -> commandStore.setCapacity(1024)); - AccordStateCache.Instance<RoutableKey, TimestampsForKey, AccordSafeTimestampsForKey> timestampsCache = commandStore.timestampsForKeyCache(); + AccordStateCache.Instance<Key, TimestampsForKey, AccordSafeTimestampsForKey> timestampsCache = commandStore.timestampsForKeyCache(); TxnId txnId = txnId(1, clock.incrementAndGet(), 1); PartialTxn txn = createPartialTxn(0); PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys()); @@ -375,7 +374,7 @@ public class AsyncLoaderTest @Test public void inProgressCFKSaveTest() { - inProgressCFKSaveTest(COMMANDS, AccordCommandStore::commandsForKeyCache, context -> context.commandsForKey, CommandsForKey::new, (cfk, u) -> cfk.update(null, u)); + this.inProgressCFKSaveTest(COMMANDS, AccordCommandStore::commandsForKeyCache, context -> context.commandsForKey, CommandsForKey::new, (cfk, u) -> cfk.update(null, u)); } @Test @@ -384,7 +383,7 @@ public class AsyncLoaderTest inProgressCFKSaveTest(TIMESTAMPS, AccordCommandStore::timestampsForKeyCache, context -> context.timestampsForKey, TimestampsForKey::new, (tfk, c) -> new TimestampsForKey(tfk.key(), c.executeAt(), c.executeAt().hlc(), c.executeAt())); } - private <T1, T2 extends AccordSafeState<RoutableKey, T1>, C extends AccordStateCache.Instance<RoutableKey, T1, T2>> void inProgressCFKSaveTest(KeyHistory history, Function<AccordCommandStore, C> getter, Function<Context, TreeMap<?, ?>> inContext, Function<Key, T1> initialiser, BiFunction<T1, Command, T1> update) + private <T1, T2 extends AccordSafeState<Key, T1>, C extends AccordStateCache.Instance<Key, T1, T2>> void inProgressCFKSaveTest(KeyHistory history, Function<AccordCommandStore, C> getter, Function<Context, TreeMap<?, ?>> inContext, Function<Key, T1> initialiser, BiFunction<T1, Command, T1> update) { AtomicLong clock = new AtomicLong(0); ManualExecutor executor = new ManualExecutor(); diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java index b235d1ae59..a5ece26a51 100644 --- a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java +++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java @@ -36,7 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.api.RoutingKey; -import accord.impl.SafeCommandsForKey; +import accord.local.SafeCommandsForKey; import accord.local.CheckedCommands; import accord.local.Command; import accord.local.PreLoadContext; @@ -53,7 +53,6 @@ import accord.primitives.PartialDeps; import accord.primitives.PartialRoute; import accord.primitives.PartialTxn; import accord.primitives.Ranges; -import accord.primitives.RoutableKey; import accord.primitives.Route; import accord.primitives.Timestamp; import accord.primitives.Txn; @@ -259,7 +258,7 @@ public class AsyncOperationTest Accept accept = Accept.SerializerSupport.create(txnId, partialRoute, txnId.epoch(), txnId.epoch(), false, Ballot.ZERO, executeAt, partialTxn.keys(), deps); Commit commit = - Commit.SerializerSupport.create(txnId, partialRoute, txnId.epoch(), Commit.Kind.Commit, Ballot.ZERO, executeAt, partialTxn.keys(), partialTxn, deps, route, null); + Commit.SerializerSupport.create(txnId, partialRoute, txnId.epoch(), Commit.Kind.CommitSlowPath, Ballot.ZERO, executeAt, partialTxn.keys(), partialTxn, deps, route, null); Commit stable = Commit.SerializerSupport.create(txnId, partialRoute, txnId.epoch(), Commit.Kind.StableSlowPath, Ballot.ZERO, executeAt, partialTxn.keys(), partialTxn, deps, route, null); @@ -482,8 +481,7 @@ public class AsyncOperationTest } try { - //TODO this is due to bad typing for Instance, it doesn't use ? extends RoutableKey - assertNoReferences(commandStore.commandsForKeyCache(), (Iterable<RoutableKey>) (Iterable<?>) keys); + assertNoReferences(commandStore.commandsForKeyCache(), keys); } catch (AssertionError e) { @@ -524,8 +522,7 @@ public class AsyncOperationTest private static void awaitDone(AccordCommandStore commandStore, List<TxnId> ids, Keys keys) { awaitDone(commandStore.commandCache(), ids); - //TODO this is due to bad typing for Instance, it doesn't use ? extends RoutableKey - awaitDone(commandStore.commandsForKeyCache(), (Iterable<RoutableKey>) (Iterable<?>) keys); + awaitDone(commandStore.commandsForKeyCache(), keys); } private static <T> void awaitDone(AccordStateCache.Instance<T, ?, ?> cache, Iterable<T> keys) diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java index e12b3fbf87..405f92dc59 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java @@ -38,9 +38,10 @@ import org.junit.BeforeClass; import org.junit.Test; import accord.api.Key; -import accord.impl.CommandsForKey; -import accord.impl.CommandsForKey.InternalStatus; +import accord.local.CommandsForKey; +import accord.local.CommandsForKey.InternalStatus; import accord.local.Command; +import accord.local.CommandsForKey.TxnInfo; import accord.local.CommonAttributes; import accord.local.CommonAttributes.Mutable; import accord.local.Listeners; @@ -241,9 +242,12 @@ public class CommandsForKeySerializerTest List<TxnId> deps = cmds[i].deps; List<TxnId> missing = cmds[i].missing; for (int j = 0 ; j < limit ; ++j) - if (i != j) deps.add(cmds[j].txnId); + { + if (i != j && cmds[i].txnId.kind().witnesses(cmds[j].txnId)) + deps.add(cmds[j].txnId); + } - int missingCount = Math.min(limit - (limit > i ? 1 : 0), missingCountSupplier.getAsInt()); + int missingCount = Math.min(deps.size(), missingCountSupplier.getAsInt()); while (missingCount > 0) { int remove = source.nextInt(deps.size()); @@ -267,14 +271,14 @@ public class CommandsForKeySerializerTest { InternalStatus status = InternalStatus.from(cmds[j].saveStatus); if (status == null || !status.hasInfo) continue; - if (status.depsKnownBefore(cmds[j].txnId, cmds[j].executeAt).compareTo(cmds[i].txnId) > 0 && Collections.binarySearch(cmds[j].missing, cmds[i].txnId) < 0) + if (cmds[j].txnId.kind().witnesses(cmds[i].txnId) && status.depsKnownBefore(cmds[j].txnId, cmds[j].executeAt).compareTo(cmds[i].txnId) > 0 && Collections.binarySearch(cmds[j].missing, cmds[i].txnId) < 0) continue outer; } for (int j = i + 1 ; j < cmds.length ; ++j) { InternalStatus status = InternalStatus.from(cmds[j].saveStatus); if (status == null || !status.hasInfo) continue; - if (Collections.binarySearch(cmds[j].missing, cmds[i].txnId) < 0) + if (cmds[j].txnId.kind().witnesses(cmds[i].txnId) && Collections.binarySearch(cmds[j].missing, cmds[i].txnId) < 0) continue outer; } cmds[i].invisible = true; @@ -322,7 +326,7 @@ public class CommandsForKeySerializerTest @Test public void serde() { -// testOne(1821931462020409370L); + testOne(-6946067792202944553L); Random random = new Random(); for (int i = 0 ; i < 10000 ; ++i) { @@ -426,13 +430,13 @@ public class CommandsForKeySerializerTest Assert.assertTrue(cmd.invisible); continue; } - CommandsForKey.Info info = cfk.info(i); + TxnInfo info = cfk.get(i); InternalStatus expectStatus = InternalStatus.from(cmd.saveStatus); if (expectStatus == null) expectStatus = InternalStatus.TRANSITIVELY_KNOWN; if (expectStatus.hasInfo) - Assert.assertEquals(cmd.executeAt, info.executeAt(cfk.txnId(i))); + Assert.assertEquals(cmd.executeAt, info.executeAt); Assert.assertEquals(expectStatus, info.status); - Assert.assertArrayEquals(cmd.missing.toArray(TxnId[]::new), info.missing); + Assert.assertArrayEquals(cmd.missing.toArray(TxnId[]::new), info.missing()); ++i; } @@ -461,11 +465,12 @@ public class CommandsForKeySerializerTest next = txnIdGen.next(rs0); return next; }).unique().ofSizeBetween(0, 10).next(rs); - CommandsForKey.Info[] info = new CommandsForKey.Info[ids.length]; + TxnInfo[] info = new TxnInfo[ids.length]; for (int i = 0; i < info.length; i++) - info[i] = rs.pick(InternalStatus.values()).asNoInfo; - Arrays.sort(ids, Comparator.naturalOrder()); - CommandsForKey expected = CommandsForKey.SerializerSupport.create(pk, redudentBefore, ids, info); + info[i] = TxnInfo.create(ids[i], rs.pick(InternalStatus.values()), ids[i], CommandsForKey.NO_TXNIDS); + Arrays.sort(info, Comparator.naturalOrder()); + + CommandsForKey expected = CommandsForKey.SerializerSupport.create(pk, info, CommandsForKey.NO_PENDING_UNMANAGED); ByteBuffer buffer = CommandsForKeySerializer.toBytesWithoutKey(expected); CommandsForKey roundTrip = CommandsForKeySerializer.fromBytes(pk, buffer); @@ -479,10 +484,10 @@ public class CommandsForKeySerializerTest long tokenValue = -2311778975040348869L; DecoratedKey key = Murmur3Partitioner.instance.decorateKey(Murmur3Partitioner.LongToken.keyForToken(tokenValue)); PartitionKey pk = new PartitionKey(TableId.fromString("1b255f4d-ef25-40a6-0000-000000000009"), key); + TxnId txnId = TxnId.fromValues(11,34052499,2,1); CommandsForKey expected = CommandsForKey.SerializerSupport.create(pk, - TxnId.fromValues(0,0,0,0), - new TxnId[] {TxnId.fromValues(11,34052499,2,1)}, - new CommandsForKey.Info[] { InternalStatus.PREACCEPTED.asNoInfo}); + new TxnInfo[] { TxnInfo.create(txnId, InternalStatus.PREACCEPTED, txnId, CommandsForKey.NO_TXNIDS) }, + CommandsForKey.NO_PENDING_UNMANAGED); ByteBuffer buffer = CommandsForKeySerializer.toBytesWithoutKey(expected); CommandsForKey roundTrip = CommandsForKeySerializer.fromBytes(pk, buffer); diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java index 5d5fc1bf56..3df7d87e0d 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java @@ -23,6 +23,8 @@ import org.junit.Test; import accord.local.Command; import accord.primitives.Deps; +import accord.primitives.Routable; +import accord.primitives.TxnId; import accord.utils.Gen; import accord.utils.Gens; import accord.utils.SimpleBitSet; @@ -52,19 +54,20 @@ public class WaitingOnSerializerTest { DataOutputBuffer buffer = new DataOutputBuffer(); qt().forAll(waitingOnGen()).check(waitingOn -> { + TxnId txnId = TxnId.NONE; + if (waitingOn.appliedOrInvalidated != null) txnId = new TxnId(txnId.epoch(), txnId.hlc(), txnId.kind(), Routable.Domain.Range, txnId.node); buffer.clear(); long expectedSize = WaitingOnSerializer.serializedSize(waitingOn); - WaitingOnSerializer.serialize(waitingOn, buffer); + WaitingOnSerializer.serialize(txnId, waitingOn, buffer); Assertions.assertThat(buffer.getLength()).isEqualTo(expectedSize); - Command.WaitingOn read = WaitingOnSerializer.deserialize(waitingOn.deps, new DataInputBuffer(buffer.unsafeGetBufferAndFlip(), false)); + Command.WaitingOn read = WaitingOnSerializer.deserialize(txnId, waitingOn.keys, waitingOn.txnIds, new DataInputBuffer(buffer.unsafeGetBufferAndFlip(), false)); Assertions.assertThat(read) .isEqualTo(waitingOn) - .isEqualTo(WaitingOnSerializer.deserialize(waitingOn.deps, WaitingOnSerializer.serialize(waitingOn))); + .isEqualTo(WaitingOnSerializer.deserialize(txnId, waitingOn.keys, waitingOn.txnIds, WaitingOnSerializer.serialize(txnId, waitingOn))); }); } - private enum WaitingOnSets - {COMMIT, APPLY, APPLYED_OR_INVALIDATED} + private enum WaitingOnSets { APPLY, APPLIED_OR_INVALIDATED } private static Gen<Command.WaitingOn> waitingOnGen() { @@ -75,22 +78,20 @@ public class WaitingOnSerializerTest return rs -> { Deps deps = depsGen.next(rs); if (deps.isEmpty()) return Command.WaitingOn.EMPTY; - int[] selected = Gens.arrays(Gens.ints().between(0, deps.txnIdCount() - 1)).unique().ofSizeBetween(0, deps.txnIdCount() - 1).next(rs); - SimpleBitSet waitingOnCommit = new SimpleBitSet(deps.txnIdCount(), false); - SimpleBitSet waitingOnApply = new SimpleBitSet(deps.txnIdCount(), false); - SimpleBitSet appliedOrInvalidated = new SimpleBitSet(deps.txnIdCount(), false); + int txnIdCount = deps.rangeDeps.txnIdCount(); + int keyCount = deps.keyDeps.keys().size(); + int[] selected = Gens.arrays(Gens.ints().between(0, txnIdCount + keyCount - 1)).unique().ofSizeBetween(0, txnIdCount + keyCount).next(rs); + SimpleBitSet waitingOn = new SimpleBitSet(txnIdCount + keyCount, false); + SimpleBitSet appliedOrInvalidated = rs.nextBoolean() ? null : new SimpleBitSet(txnIdCount, false); for (int i : selected) { - WaitingOnSets set = sets.next(rs); + WaitingOnSets set = appliedOrInvalidated == null || i >= txnIdCount ? WaitingOnSets.APPLY : sets.next(rs); switch (set) { - case COMMIT: - waitingOnCommit.set(i); - break; case APPLY: - waitingOnApply.set(i); + waitingOn.set(i); break; - case APPLYED_OR_INVALIDATED: + case APPLIED_OR_INVALIDATED: appliedOrInvalidated.set(i); break; default: @@ -98,7 +99,7 @@ public class WaitingOnSerializerTest } } - return new Command.WaitingOn(deps, Utils.ensureImmutable(waitingOnCommit), Utils.ensureImmutable(waitingOnApply), Utils.ensureImmutable(appliedOrInvalidated)); + return new Command.WaitingOn(deps.keyDeps.keys(), deps.rangeDeps.txnIds(), Utils.ensureImmutable(waitingOn), Utils.ensureImmutable(appliedOrInvalidated)); }; } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org