This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new d4c308a8a6 perf improvements
d4c308a8a6 is described below
commit d4c308a8a68ff892e4bbebf673b3d924a5313e52
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Feb 28 15:50:04 2024 +0000
perf improvements
---
modules/accord | 2 +-
.../org/apache/cassandra/config/AccordSpec.java | 4 +-
src/java/org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 22 +++
.../db/compaction/CompactionIterator.java | 6 +-
src/java/org/apache/cassandra/net/Verb.java | 32 +--
.../service/accord/AccordCachingState.java | 4 +-
.../service/accord/AccordCommandStore.java | 20 +-
.../service/accord/AccordCommandStores.java | 8 +-
.../service/accord/AccordConfiguration.java | 1 +
.../cassandra/service/accord/AccordJournal.java | 15 ++
.../cassandra/service/accord/AccordKeyspace.java | 19 +-
.../service/accord/AccordObjectSizes.java | 29 +--
.../service/accord/AccordSafeCommandStore.java | 50 +++--
.../service/accord/AccordSafeCommandsForKey.java | 13 +-
.../cassandra/service/accord/AccordSafeState.java | 2 +
.../service/accord/AccordSafeTimestampsForKey.java | 9 +-
.../cassandra/service/accord/AccordService.java | 8 +-
.../service/accord/CommandsForRanges.java | 11 +-
.../cassandra/service/accord/api/AccordAgent.java | 3 +
.../service/accord/async/AsyncLoader.java | 12 +-
.../service/accord/async/AsyncOperation.java | 5 +-
.../service/accord/interop/AccordInteropApply.java | 2 -
.../serializers/CommandsForKeySerializer.java | 217 +++++++++++----------
.../accord/serializers/WaitingOnSerializer.java | 103 +++++++---
.../cassandra/service/accord/txn/TxnWrite.java | 5 +-
.../apache/cassandra/utils/vint/VIntCoding.java | 47 ++++-
test/conf/logback-dtest-quiet.xml | 56 ++++++
.../cassandra/distributed/api/ICoordinator.java | 3 +
.../cassandra/distributed/impl/Coordinator.java | 25 +++
.../cassandra/distributed/impl/Instance.java | 2 +-
.../distributed/test/accord/AccordLoadTest.java | 96 +++++++++
.../compaction/CompactionAccordIteratorsTest.java | 2 +-
.../service/accord/AccordCommandStoreTest.java | 29 +--
.../service/accord/AccordCommandTest.java | 8 +-
.../cassandra/service/accord/AccordTestUtils.java | 12 +-
.../service/accord/async/AsyncLoaderTest.java | 9 +-
.../service/accord/async/AsyncOperationTest.java | 11 +-
.../serializers/CommandsForKeySerializerTest.java | 39 ++--
.../serializers/WaitingOnSerializerTest.java | 33 ++--
40 files changed, 654 insertions(+), 321 deletions(-)
diff --git a/modules/accord b/modules/accord
index ef36616441..3562bb3c9c 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit ef36616441bd4ff4fec5379d986c75ad5a62ff7d
+Subproject commit 3562bb3c9ce4e9eecdf65e236e968ef3ee9e0a86
diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java
b/src/java/org/apache/cassandra/config/AccordSpec.java
index 697d7edc1e..e76745a233 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -47,9 +47,9 @@ public class AccordSpec
public volatile DurationSpec fast_path_update_delay = new
DurationSpec.IntSecondsBound(5);
- public volatile DurationSpec schedule_durability_frequency = new
DurationSpec.IntSecondsBound(15);
+ public volatile DurationSpec schedule_durability_frequency = new
DurationSpec.IntSecondsBound(5);
public volatile DurationSpec durability_txnid_lag = new
DurationSpec.IntSecondsBound(5);
- public volatile DurationSpec shard_durability_cycle = new
DurationSpec.IntMinutesBound(2);
+ public volatile DurationSpec shard_durability_cycle = new
DurationSpec.IntMinutesBound(1);
public volatile DurationSpec global_durability_cycle = new
DurationSpec.IntMinutesBound(10);
public enum TransactionalRangeMigration
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 338ae13981..8a7a9d7d35 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -480,6 +480,7 @@ public class Config
public volatile int counter_cache_keys_to_save = Integer.MAX_VALUE;
public DataStorageSpec.LongMebibytesBound paxos_cache_size = null;
+ public DataStorageSpec.LongMebibytesBound accord_cache_size = null;
public DataStorageSpec.LongMebibytesBound consensus_migration_cache_size =
null;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3922a94c93..7273f3c5c7 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -210,6 +210,7 @@ public class DatabaseDescriptor
private static long keyCacheSizeInMiB;
private static long paxosCacheSizeInMiB;
+ private static long accordCacheSizeInMiB;
private static long consensusMigrationCacheSizeInMiB;
private static long counterCacheSizeInMiB;
private static long indexSummaryCapacityInMiB;
@@ -894,6 +895,22 @@ public class DatabaseDescriptor
+ conf.paxos_cache_size + "',
supported values are <integer> >= 0.", false);
}
+ try
+ {
+ // if paxosCacheSizeInMiB option was set to "auto" then size of
the cache should be "max(10% of Heap (in MB), 1MB)
+ accordCacheSizeInMiB = (conf.accord_cache_size == null)
+ ? Math.max(1, (int)
((Runtime.getRuntime().totalMemory() * 0.10) / 1024 / 1024))
+ : conf.accord_cache_size.toMebibytes();
+
+ if (accordCacheSizeInMiB < 0)
+ throw new NumberFormatException(); // to escape duplicating
error message
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException("paxos_cache_size option was set
incorrectly to '"
+ + conf.paxos_cache_size + "',
supported values are <integer> >= 0.", false);
+ }
+
try
{
// if consensusMigrationCacheSizeInMiB option was set to "auto"
then size of the cache should be "min(1% of Heap (in MB), 50MB)
@@ -3863,6 +3880,11 @@ public class DatabaseDescriptor
return paxosCacheSizeInMiB;
}
+ public static long getAccordCacheSizeInMiB()
+ {
+ return accordCacheSizeInMiB;
+ }
+
public static long getConsensusMigrationCacheSizeInMiB()
{
return consensusMigrationCacheSizeInMiB;
diff --git
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index bb404bbe5a..e0dcb5682f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -978,11 +978,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
if (redundantBeforeEntry == null)
return row;
- TxnId redundantBeforeTxnId =
redundantBeforeEntry.shardRedundantBefore();
- if (redundantBeforeTxnId.equals(TxnId.NONE))
- return row;
-
- return
CommandsForKeysAccessor.withoutRedundantCommands(partitionKey, row,
redundantBeforeTxnId);
+ return
CommandsForKeysAccessor.withoutRedundantCommands(partitionKey, row,
redundantBeforeEntry);
}
@Override
diff --git a/src/java/org/apache/cassandra/net/Verb.java
b/src/java/org/apache/cassandra/net/Verb.java
index 457bd14c80..4fed5c618f 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -266,7 +266,7 @@ public enum Verb
PAXOS2_PREPARE_REQ (40, P2, writeTimeout, MUTATION,
() -> PaxosPrepare.requestSerializer, () ->
PaxosPrepare.requestHandler, PAXOS2_PREPARE_RSP
),
PAXOS2_PREPARE_REFRESH_RSP (51, P2, writeTimeout, REQUEST_RESPONSE,
() -> PaxosPrepareRefresh.responseSerializer, RESPONSE_HANDLER
),
PAXOS2_PREPARE_REFRESH_REQ (41, P2, writeTimeout, MUTATION,
() -> PaxosPrepareRefresh.requestSerializer, () ->
PaxosPrepareRefresh.requestHandler,
PAXOS2_PREPARE_REFRESH_RSP ),
- PAXOS2_PROPOSE_RSP (52, P2, writeTimeout, REQUEST_RESPONSE,
() -> PaxosPropose.ACCEPT_RESULT_SERIALIZER, RESPONSE_HANDLER
),
+ PAXOS2_PROPOSE_RSP (52, P2, writeTimeout, REQUEST_RESPONSE,
() -> PaxosPropose.ACCEPT_RESULT_SERIALIZER, RESPONSE_HANDLER
),
PAXOS2_PROPOSE_REQ (42, P2, writeTimeout, MUTATION,
() -> PaxosPropose.requestSerializer, () ->
PaxosPropose.requestHandler, PAXOS2_PROPOSE_RSP
),
PAXOS2_COMMIT_AND_PREPARE_RSP (53, P2, writeTimeout, REQUEST_RESPONSE,
() -> PaxosPrepare.responseSerializer, RESPONSE_HANDLER
),
PAXOS2_COMMIT_AND_PREPARE_REQ (43, P2, writeTimeout, MUTATION,
() -> PaxosCommitAndPrepare.requestSerializer, () ->
PaxosCommitAndPrepare.requestHandler,
PAXOS2_COMMIT_AND_PREPARE_RSP ),
@@ -305,41 +305,41 @@ public enum Verb
DATA_MOVEMENT_EXECUTED_REQ (817, P1, rpcTimeout, MISC, () ->
DataMovement.Status.serializer, () -> DataMovements.instance,
DATA_MOVEMENT_EXECUTED_RSP ),
// accord
- ACCORD_SIMPLE_RSP (119, P2, writeTimeout, REQUEST_RESPONSE,
() -> EnumSerializer.simpleReply, RESPONSE_HANDLER
),
- ACCORD_PRE_ACCEPT_RSP (120, P2, writeTimeout, REQUEST_RESPONSE,
() -> PreacceptSerializers.reply, RESPONSE_HANDLER
),
+ ACCORD_SIMPLE_RSP (119, P2, writeTimeout, IMMEDIATE,
() -> EnumSerializer.simpleReply, RESPONSE_HANDLER
),
+ ACCORD_PRE_ACCEPT_RSP (120, P2, writeTimeout, IMMEDIATE,
() -> PreacceptSerializers.reply, RESPONSE_HANDLER
),
ACCORD_PRE_ACCEPT_REQ (121, P2, writeTimeout, IMMEDIATE,
() -> PreacceptSerializers.request, AccordService::verbHandlerOrNoop,
ACCORD_PRE_ACCEPT_RSP ),
- ACCORD_ACCEPT_RSP (122, P2, writeTimeout, REQUEST_RESPONSE,
() -> AcceptSerializers.reply, RESPONSE_HANDLER
),
+ ACCORD_ACCEPT_RSP (122, P2, writeTimeout, IMMEDIATE,
() -> AcceptSerializers.reply, RESPONSE_HANDLER
),
ACCORD_ACCEPT_REQ (123, P2, writeTimeout, IMMEDIATE,
() -> AcceptSerializers.request, AccordService::verbHandlerOrNoop,
ACCORD_ACCEPT_RSP ),
ACCORD_ACCEPT_INVALIDATE_REQ (124, P2, writeTimeout, IMMEDIATE,
() -> AcceptSerializers.invalidate, AccordService::verbHandlerOrNoop,
ACCORD_ACCEPT_RSP ),
- ACCORD_READ_RSP (125, P2, writeTimeout, REQUEST_RESPONSE,
() -> ReadDataSerializers.reply, RESPONSE_HANDLER
),
+ ACCORD_READ_RSP (125, P2, writeTimeout, IMMEDIATE,
() -> ReadDataSerializers.reply, RESPONSE_HANDLER
),
ACCORD_READ_REQ (126, P2, writeTimeout, IMMEDIATE,
() -> ReadDataSerializers.readData, AccordService::verbHandlerOrNoop,
ACCORD_READ_RSP ),
ACCORD_COMMIT_REQ (127, P2, writeTimeout, IMMEDIATE,
() -> CommitSerializers.request, AccordService::verbHandlerOrNoop,
ACCORD_READ_RSP ),
ACCORD_COMMIT_INVALIDATE_REQ (128, P2, writeTimeout, IMMEDIATE,
() -> CommitSerializers.invalidate, AccordService::verbHandlerOrNoop
),
- ACCORD_APPLY_RSP (129, P2, writeTimeout, REQUEST_RESPONSE,
() -> ApplySerializers.reply, RESPONSE_HANDLER
),
- ACCORD_APPLY_REQ (130, P2, writeTimeout, IMMEDIATE, () ->
ApplySerializers.request, AccordService::verbHandlerOrNoop, ACCORD_APPLY_RSP
),
- ACCORD_BEGIN_RECOVER_RSP (131, P2, writeTimeout, REQUEST_RESPONSE,
() -> RecoverySerializers.reply, RESPONSE_HANDLER
),
+ ACCORD_APPLY_RSP (129, P2, writeTimeout, IMMEDIATE,
() -> ApplySerializers.reply, RESPONSE_HANDLER
),
+ ACCORD_APPLY_REQ (130, P2, writeTimeout, IMMEDIATE,
() -> ApplySerializers.request, AccordService::verbHandlerOrNoop,
ACCORD_APPLY_RSP ),
+ ACCORD_BEGIN_RECOVER_RSP (131, P2, writeTimeout, IMMEDIATE,
() -> RecoverySerializers.reply, RESPONSE_HANDLER
),
ACCORD_BEGIN_RECOVER_REQ (132, P2, writeTimeout, IMMEDIATE,
() -> RecoverySerializers.request, AccordService::verbHandlerOrNoop,
ACCORD_BEGIN_RECOVER_RSP ),
- ACCORD_BEGIN_INVALIDATE_RSP (133, P2, writeTimeout, REQUEST_RESPONSE,
() -> BeginInvalidationSerializers.reply, RESPONSE_HANDLER
),
+ ACCORD_BEGIN_INVALIDATE_RSP (133, P2, writeTimeout, IMMEDIATE,
() -> BeginInvalidationSerializers.reply, RESPONSE_HANDLER
),
ACCORD_BEGIN_INVALIDATE_REQ (134, P2, writeTimeout, IMMEDIATE,
() -> BeginInvalidationSerializers.request, AccordService::verbHandlerOrNoop,
ACCORD_BEGIN_INVALIDATE_RSP ),
- ACCORD_WAIT_ON_COMMIT_RSP (136, P2, writeTimeout, REQUEST_RESPONSE,
() -> WaitOnCommitSerializer.reply, RESPONSE_HANDLER
),
+ ACCORD_WAIT_ON_COMMIT_RSP (136, P2, writeTimeout, IMMEDIATE,
() -> WaitOnCommitSerializer.reply, RESPONSE_HANDLER
),
ACCORD_WAIT_ON_COMMIT_REQ (135, P2, writeTimeout, IMMEDIATE,
() -> WaitOnCommitSerializer.request, AccordService::verbHandlerOrNoop,
ACCORD_WAIT_ON_COMMIT_RSP ),
ACCORD_WAIT_UNTIL_APPLIED_REQ (137, P2, writeTimeout, IMMEDIATE,
() -> ReadDataSerializers.waitUntilApplied, AccordService::verbHandlerOrNoop,
ACCORD_READ_RSP ),
ACCORD_INFORM_OF_TXN_REQ (138, P2, writeTimeout, IMMEDIATE,
() -> InformOfTxnIdSerializers.request, AccordService::verbHandlerOrNoop,
ACCORD_SIMPLE_RSP ),
ACCORD_INFORM_HOME_DURABLE_REQ (139, P2, writeTimeout, IMMEDIATE,
() -> InformHomeDurableSerializers.request, AccordService::verbHandlerOrNoop,
ACCORD_SIMPLE_RSP ),
ACCORD_INFORM_DURABLE_REQ (140, P2, writeTimeout, IMMEDIATE,
() -> InformDurableSerializers.request, AccordService::verbHandlerOrNoop,
ACCORD_SIMPLE_RSP ),
- ACCORD_CHECK_STATUS_RSP (141, P2, writeTimeout, REQUEST_RESPONSE,
() -> CheckStatusSerializers.reply, RESPONSE_HANDLER
),
+ ACCORD_CHECK_STATUS_RSP (141, P2, writeTimeout, IMMEDIATE,
() -> CheckStatusSerializers.reply, RESPONSE_HANDLER
),
ACCORD_CHECK_STATUS_REQ (142, P2, writeTimeout, IMMEDIATE,
() -> CheckStatusSerializers.request, AccordService::verbHandlerOrNoop,
ACCORD_CHECK_STATUS_RSP ),
- ACCORD_GET_DEPS_RSP (143, P2, writeTimeout, REQUEST_RESPONSE,
() -> GetDepsSerializers.reply, RESPONSE_HANDLER
),
+ ACCORD_GET_DEPS_RSP (143, P2, writeTimeout, IMMEDIATE,
() -> GetDepsSerializers.reply, RESPONSE_HANDLER
),
ACCORD_GET_DEPS_REQ (144, P2, writeTimeout, IMMEDIATE,
() -> GetDepsSerializers.request, AccordService::verbHandlerOrNoop,
ACCORD_GET_DEPS_RSP ),
- ACCORD_GET_EPHMRL_READ_DEPS_RSP (161, P2, writeTimeout, REQUEST_RESPONSE,
() -> GetEphmrlReadDepsSerializers.reply, RESPONSE_HANDLER
),
+ ACCORD_GET_EPHMRL_READ_DEPS_RSP (161, P2, writeTimeout, IMMEDIATE,
() -> GetEphmrlReadDepsSerializers.reply, RESPONSE_HANDLER
),
ACCORD_GET_EPHMRL_READ_DEPS_REQ (162, P2, writeTimeout, IMMEDIATE,
() -> GetEphmrlReadDepsSerializers.request, AccordService::verbHandlerOrNoop,
ACCORD_GET_EPHMRL_READ_DEPS_RSP),
- ACCORD_GET_MAX_CONFLICT_RSP (163, P2, writeTimeout, REQUEST_RESPONSE,
() -> GetMaxConflictSerializers.reply, RESPONSE_HANDLER
),
+ ACCORD_GET_MAX_CONFLICT_RSP (163, P2, writeTimeout, IMMEDIATE,
() -> GetMaxConflictSerializers.reply, RESPONSE_HANDLER
),
ACCORD_GET_MAX_CONFLICT_REQ (164, P2, writeTimeout, IMMEDIATE,
() -> GetMaxConflictSerializers.request, AccordService::verbHandlerOrNoop,
ACCORD_GET_MAX_CONFLICT_RSP),
- ACCORD_FETCH_DATA_RSP (145, P2, repairTimeout,REQUEST_RESPONSE,
() -> FetchSerializers.reply, RESPONSE_HANDLER
),
+ ACCORD_FETCH_DATA_RSP (145, P2, repairTimeout,IMMEDIATE,
() -> FetchSerializers.reply, RESPONSE_HANDLER
),
ACCORD_FETCH_DATA_REQ (146, P2, repairTimeout,IMMEDIATE,
() -> FetchSerializers.request, AccordService::verbHandlerOrNoop,
ACCORD_FETCH_DATA_RSP ),
ACCORD_SET_SHARD_DURABLE_REQ (147, P2, writeTimeout, IMMEDIATE,
() -> SetDurableSerializers.shardDurable, AccordService::verbHandlerOrNoop,
ACCORD_SIMPLE_RSP ),
ACCORD_SET_GLOBALLY_DURABLE_REQ (148, P2, writeTimeout, IMMEDIATE,
() -> SetDurableSerializers.globallyDurable,AccordService::verbHandlerOrNoop,
ACCORD_SIMPLE_RSP ),
- ACCORD_QUERY_DURABLE_BEFORE_RSP (149, P2, writeTimeout, REQUEST_RESPONSE,
() -> QueryDurableBeforeSerializers.reply, RESPONSE_HANDLER
),
+ ACCORD_QUERY_DURABLE_BEFORE_RSP (149, P2, writeTimeout, IMMEDIATE,
() -> QueryDurableBeforeSerializers.reply, RESPONSE_HANDLER
),
ACCORD_QUERY_DURABLE_BEFORE_REQ (150, P2, writeTimeout, IMMEDIATE,
() -> QueryDurableBeforeSerializers.request,AccordService::verbHandlerOrNoop,
ACCORD_QUERY_DURABLE_BEFORE_RSP ),
ACCORD_SYNC_NOTIFY_REQ (151, P2, writeTimeout, IMMEDIATE,
() -> Notification.listSerializer, () ->
AccordSyncPropagator.verbHandler, ACCORD_SIMPLE_RSP ),
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCachingState.java
b/src/java/org/apache/cassandra/service/accord/AccordCachingState.java
index 5175e86c10..d07dcfc87b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCachingState.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCachingState.java
@@ -123,7 +123,7 @@ public class AccordCachingState<K, V> extends
IntrusiveLinkedListNode
int estimatedSizeOnHeap(ToLongFunction<V> estimator)
{
- shouldUpdateSize = false;
+ shouldUpdateSize = false; // TODO (expected): probably not the
safest place to clear need to compute size
return lastQueriedEstimatedSizeOnHeap = Ints.checkedCast(EMPTY_SIZE +
estimateStateOnHeapSize(estimator));
}
@@ -204,7 +204,7 @@ public class AccordCachingState<K, V> extends
IntrusiveLinkedListNode
protected State<K, V> state(State<K, V> next)
{
State<K, V> prev = state;
- if (prev != next)
+ if (prev != next) // TODO (expected): we change state to transition
the cache state machine but often keep payload the same - so shouldn't recompute
shouldUpdateSize = true;
return state = next;
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 36b224b022..4b0722606b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -42,7 +42,7 @@ import accord.api.Agent;
import accord.api.DataStore;
import accord.api.Key;
import accord.api.ProgressLog;
-import accord.impl.CommandsForKey;
+import accord.local.CommandsForKey;
import accord.impl.CommandsSummary;
import accord.impl.TimestampsForKey;
import accord.local.Command;
@@ -114,8 +114,8 @@ public class AccordCommandStore extends CommandStore
implements CacheSize
private final ExecutionOrder executionOrder;
private final AccordStateCache stateCache;
private final AccordStateCache.Instance<TxnId, Command, AccordSafeCommand>
commandCache;
- private final AccordStateCache.Instance<RoutableKey, TimestampsForKey,
AccordSafeTimestampsForKey> timestampsForKeyCache;
- private final AccordStateCache.Instance<RoutableKey, CommandsForKey,
AccordSafeCommandsForKey> commandsForKeyCache;
+ private final AccordStateCache.Instance<Key, TimestampsForKey,
AccordSafeTimestampsForKey> timestampsForKeyCache;
+ private final AccordStateCache.Instance<Key, CommandsForKey,
AccordSafeCommandsForKey> commandsForKeyCache;
private AsyncOperation<?> currentOperation = null;
private AccordSafeCommandStore current = null;
private long lastSystemTimestampMicros = Long.MIN_VALUE;
@@ -161,7 +161,7 @@ public class AccordCommandStore extends CommandStore
implements CacheSize
this::validateCommand,
AccordObjectSizes::command);
timestampsForKeyCache =
- stateCache.instance(RoutableKey.class,
+ stateCache.instance(Key.class,
AccordSafeTimestampsForKey.class,
AccordSafeTimestampsForKey::new,
this::loadTimestampsForKey,
@@ -169,7 +169,7 @@ public class AccordCommandStore extends CommandStore
implements CacheSize
this::validateTimestampsForKey,
AccordObjectSizes::timestampsForKey);
commandsForKeyCache =
- stateCache.instance(RoutableKey.class,
+ stateCache.instance(Key.class,
AccordSafeCommandsForKey.class,
AccordSafeCommandsForKey::new,
this::loadCommandsForKey,
@@ -220,7 +220,7 @@ public class AccordCommandStore extends CommandStore
implements CacheSize
MessageProvider messageProvider =
journal.makeMessageProvider(txnId);
- SerializerSupport.TxnAndDeps txnAndDeps =
SerializerSupport.extractTxnAndDeps(status, accepted, messageProvider);
+ SerializerSupport.TxnAndDeps txnAndDeps =
SerializerSupport.extractTxnAndDeps(unsafeRangesForEpoch(), status, accepted,
messageProvider);
Seekables<?, ?> keys = txnAndDeps.txn.keys();
if (keys.domain() != Routable.Domain.Range)
throw new AssertionError(String.format("Txn keys are not
range for %s", txnAndDeps.txn));
@@ -311,12 +311,12 @@ public class AccordCommandStore extends CommandStore
implements CacheSize
return commandCache;
}
- public AccordStateCache.Instance<RoutableKey, TimestampsForKey,
AccordSafeTimestampsForKey> timestampsForKeyCache()
+ public AccordStateCache.Instance<Key, TimestampsForKey,
AccordSafeTimestampsForKey> timestampsForKeyCache()
{
return timestampsForKeyCache;
}
- public AccordStateCache.Instance<RoutableKey, CommandsForKey,
AccordSafeCommandsForKey> commandsForKeyCache()
+ public AccordStateCache.Instance<Key, CommandsForKey,
AccordSafeCommandsForKey> commandsForKeyCache()
{
return commandsForKeyCache;
}
@@ -466,8 +466,8 @@ public class AccordCommandStore extends CommandStore
implements CacheSize
public AccordSafeCommandStore beginOperation(PreLoadContext preLoadContext,
Map<TxnId, AccordSafeCommand>
commands,
- NavigableMap<RoutableKey,
AccordSafeTimestampsForKey> timestampsForKeys,
- NavigableMap<RoutableKey,
AccordSafeCommandsForKey> commandsForKeys)
+ NavigableMap<Key,
AccordSafeTimestampsForKey> timestampsForKeys,
+ NavigableMap<Key,
AccordSafeCommandsForKey> commandsForKeys)
{
Invariants.checkState(current == null);
commands.values().forEach(AccordSafeState::preExecute);
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index cf4cda992b..0fd719d5fb 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -31,6 +31,7 @@ import accord.primitives.Range;
import accord.topology.Topology;
import accord.utils.RandomSource;
import org.apache.cassandra.cache.CacheSize;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.metrics.AccordStateCacheMetrics;
import org.apache.cassandra.metrics.CacheSizeMetrics;
import org.apache.cassandra.schema.TableId;
@@ -47,7 +48,7 @@ public class AccordCommandStores extends CommandStores
implements CacheSize
ShardDistributor shardDistributor, ProgressLog.Factory
progressLogFactory, AccordJournal journal)
{
super(time, agent, store, random, shardDistributor,
progressLogFactory, AccordCommandStore.factory(journal, new
AccordStateCacheMetrics(ACCORD_STATE_CACHE)));
- setCapacity(maxCacheSize());
+ setCapacity(DatabaseDescriptor.getAccordCacheSizeInMiB() << 20);
this.cacheSizeMetrics = new CacheSizeMetrics(ACCORD_STATE_CACHE, this);
}
@@ -110,11 +111,6 @@ public class AccordCommandStores extends CommandStores
implements CacheSize
forEach(commandStore -> ((AccordSafeCommandStore)
commandStore).commandStore().setCapacity(perStore));
}
- private static long maxCacheSize()
- {
- return 5 << 20; // TODO (required): make configurable
- }
-
@Override
public synchronized Supplier<EpochReady> updateTopology(Node node,
Topology newTopology, boolean startSync)
{
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordConfiguration.java
b/src/java/org/apache/cassandra/service/accord/AccordConfiguration.java
index a17a9fc844..d87e6c96cd 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordConfiguration.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordConfiguration.java
@@ -23,6 +23,7 @@ import java.time.Duration;
import accord.config.LocalConfig;
import org.apache.cassandra.config.Config;
+// TODO (expected): should this be merged with AccordSpec?
public class AccordConfiguration implements LocalConfig
{
private final Config config;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index ae202d2f63..26e09ada14 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -570,6 +571,7 @@ public class AccordJournal implements Shutdownable
Key(Timestamp timestamp, Type type)
{
+ if (timestamp == null) throw new NullPointerException("Null
timestamp for type " + type);
this.timestamp = timestamp;
this.type = type;
}
@@ -1361,6 +1363,19 @@ public class AccordJournal implements Shutdownable
return presentMessages;
}
+ public Set<MessageType> all()
+ {
+ Set<Type> types = EnumSet.allOf(Type.class);
+ Set<Key> keys = new ObjectHashSet<>(types.size() + 1, 0.9f);
+ for (Type type : types)
+ keys.add(new Key(txnId, type));
+ Set<Key> presentKeys = journal.test(keys);
+ Set<MessageType> presentMessages = new
ObjectHashSet<>(presentKeys.size() + 1, 0.9f);
+ for (Key key : presentKeys)
+ presentMessages.add(key.type.outgoingType);
+ return presentMessages;
+ }
+
@Override
public PreAccept preAccept()
{
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 2684be683f..535bc4948f 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -45,7 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.Key;
-import accord.impl.CommandsForKey;
+import accord.local.CommandsForKey;
import accord.impl.TimestampsForKey;
import accord.local.Command;
import accord.local.Command.WaitingOn;
@@ -592,7 +592,7 @@ public class AccordKeyspace
}
// TODO (expected): garbage-free filtering, reusing encoding
- public Row withoutRedundantCommands(PartitionKey key, Row row, TxnId
redundantBefore)
+ public Row withoutRedundantCommands(PartitionKey key, Row row,
RedundantBefore.Entry redundantBefore)
{
Invariants.checkState(row.columnCount() == 1);
Cell<?> cell = row.getCell(data);
@@ -600,7 +600,10 @@ public class AccordKeyspace
return row;
CommandsForKey current = CommandsForKeySerializer.fromBytes(key,
cell.buffer());
- CommandsForKey updated = current.withoutRedundant(redundantBefore);
+ if (current == null)
+ return null;
+
+ CommandsForKey updated =
current.withRedundantBefore(redundantBefore);
if (current == updated)
return row;
@@ -822,7 +825,7 @@ public class AccordKeyspace
Command.Committed committed = command.asCommitted();
Command.Committed originalCommitted = original != null &&
original.isCommitted() ? original.asCommitted() : null;
if (originalCommitted == null || committed.waitingOn !=
originalCommitted.waitingOn)
- builder.addCell(live(CommandsColumns.waiting_on,
timestampMicros, WaitingOnSerializer.serialize(committed.waitingOn)));
+ builder.addCell(live(CommandsColumns.waiting_on,
timestampMicros, WaitingOnSerializer.serialize(committed.txnId(),
committed.waitingOn)));
}
Row row = builder.build();
@@ -1189,10 +1192,10 @@ public class AccordKeyspace
Ballot promised = deserializePromisedOrNull(row);
Ballot accepted = deserializeAcceptedOrNull(row);
- WaitingOnProvider waitingOn = deserializeWaitingOn(row);
+ WaitingOnProvider waitingOn = deserializeWaitingOn(txnId, row);
MessageProvider messages = commandStore.makeMessageProvider(txnId);
- return SerializerSupport.reconstruct(attrs, status, executeAt,
promised, accepted, waitingOn, messages);
+ return
SerializerSupport.reconstruct(commandStore.unsafeRangesForEpoch(), attrs,
status, executeAt, promised, accepted, waitingOn, messages);
}
catch (Throwable t)
{
@@ -1270,7 +1273,7 @@ public class AccordKeyspace
return deserializeTimestampOrNull(row.getBlob("accepted_ballot"),
Ballot::fromBits);
}
- private static WaitingOnProvider deserializeWaitingOn(UntypedResultSet.Row
row)
+ private static WaitingOnProvider deserializeWaitingOn(TxnId txnId,
UntypedResultSet.Row row)
{
ByteBuffer bytes = row.getBlob("waiting_on");
@@ -1284,7 +1287,7 @@ public class AccordKeyspace
try
{
- return WaitingOnSerializer.deserialize(deps, bytes);
+ return WaitingOnSerializer.deserialize(txnId,
deps.keyDeps.keys(), deps.rangeDeps.txnIds(), bytes);
}
catch (IOException e)
{
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index a1704ba52b..7346a6eebf 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -24,11 +24,12 @@ import java.util.function.ToLongFunction;
import accord.api.Key;
import accord.api.Result;
import accord.api.RoutingKey;
-import accord.impl.CommandsForKey;
-import accord.impl.CommandsForKey.Info;
+import accord.local.CommandsForKey;
+import accord.local.CommandsForKey.TxnInfo;
import accord.impl.TimestampsForKey;
import accord.local.Command;
import accord.local.Command.WaitingOn;
+import accord.local.CommandsForKey.TxnInfoWithMissing;
import accord.local.CommonAttributes;
import accord.local.Node;
import accord.local.SaveStatus;
@@ -305,7 +306,6 @@ public class AccordObjectSizes
return ACCEPTED;
case Committed:
case Stable:
- case ReadyToExecute:
return COMMITTED;
case PreApplied:
case Applied:
@@ -363,22 +363,25 @@ public class AccordObjectSizes
}
private static long EMPTY_CFK_SIZE = measure(new CommandsForKey(null));
- private static long EMPTY_INFO_SIZE =
measure(CommandsForKey.Info.createMock(null, null, null));
+ private static long EMPTY_INFO_SIZE =
measure(TxnInfo.createMock(TxnId.NONE, null, null, null));
+ private static long EMPTY_INFO_WITH_MISSING_ADDITIONAL_SIZE =
measure(TxnInfo.createMock(TxnId.NONE, null, null, null)) - EMPTY_INFO_SIZE;
public static long commandsForKey(CommandsForKey cfk)
{
long size = EMPTY_CFK_SIZE;
size += key(cfk.key());
- size += 2 * ObjectSizes.sizeOfReferenceArray(cfk.size());
- size += cfk.size() * TIMESTAMP_SIZE;
+ size += ObjectSizes.sizeOfReferenceArray(cfk.size());
+ size += cfk.size() * EMPTY_INFO_SIZE;
for (int i = 0 ; i < cfk.size() ; ++i)
{
- Info info = cfk.info(i);
- if (info.getClass() == CommandsForKey.NoInfo.class)
- continue;
-
- size += EMPTY_INFO_SIZE;
- if (info.missing.length > 0)
- size += ObjectSizes.sizeOfReferenceArray(info.missing.length);
+ TxnInfo info = cfk.get(i);
+ if (info.getClass() != TxnInfoWithMissing.class) continue;
+ TxnInfoWithMissing infoWithMissing = (TxnInfoWithMissing) info;
+ if (infoWithMissing.missing.length > 0)
+ {
+ size += EMPTY_INFO_WITH_MISSING_ADDITIONAL_SIZE;
+ size +=
ObjectSizes.sizeOfReferenceArray(infoWithMissing.missing.length);
+ size += infoWithMissing.missing.length * TIMESTAMP_SIZE;
+ }
}
return size;
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index c1b09ccb27..1c497cc5c9 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -31,8 +31,7 @@ import accord.api.DataStore;
import accord.api.Key;
import accord.api.ProgressLog;
import accord.impl.AbstractSafeCommandStore;
-import accord.impl.CommandsForKey;
-import accord.impl.CommandsForKeys;
+import accord.local.CommandsForKey;
import accord.impl.CommandsSummary;
import accord.local.Command;
import accord.local.CommandStores.RangesForEpoch;
@@ -41,7 +40,6 @@ import accord.local.PreLoadContext;
import accord.primitives.AbstractKeys;
import accord.primitives.Deps;
import accord.primitives.Ranges;
-import accord.primitives.RoutableKey;
import accord.primitives.Routables;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
@@ -53,16 +51,16 @@ import static accord.primitives.Routable.Domain.Range;
public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeCommand, AccordSafeTimestampsForKey,
AccordSafeCommandsForKey>
{
private final Map<TxnId, AccordSafeCommand> commands;
- private final NavigableMap<RoutableKey, AccordSafeCommandsForKey>
commandsForKeys;
- private final NavigableMap<RoutableKey, AccordSafeTimestampsForKey>
timestampsForKeys;
+ private final NavigableMap<Key, AccordSafeCommandsForKey> commandsForKeys;
+ private final NavigableMap<Key, AccordSafeTimestampsForKey>
timestampsForKeys;
private final AccordCommandStore commandStore;
private final RangesForEpoch ranges;
CommandsForRanges.Updater rangeUpdates = null;
public AccordSafeCommandStore(PreLoadContext context,
Map<TxnId, AccordSafeCommand> commands,
- NavigableMap<RoutableKey,
AccordSafeTimestampsForKey> timestampsForKey,
- NavigableMap<RoutableKey,
AccordSafeCommandsForKey> commandsForKey,
+ NavigableMap<Key,
AccordSafeTimestampsForKey> timestampsForKey,
+ NavigableMap<Key, AccordSafeCommandsForKey>
commandsForKey,
AccordCommandStore commandStore)
{
super(context);
@@ -94,7 +92,7 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
}
@Override
- protected AccordSafeCommandsForKey getCommandsForKeyInternal(RoutableKey
key)
+ protected AccordSafeCommandsForKey getCommandsForKeyInternal(Key key)
{
return commandsForKeys.get(key);
}
@@ -106,7 +104,7 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
}
@Override
- protected AccordSafeCommandsForKey getCommandsForKeyIfLoaded(RoutableKey
key)
+ protected AccordSafeCommandsForKey getCommandsForKeyIfLoaded(Key key)
{
AccordSafeCommandsForKey cfk =
commandStore.commandsForKeyCache().acquireIfLoaded(key);
if (cfk != null) cfk.preExecute();
@@ -114,7 +112,7 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
}
@Override
- protected AccordSafeTimestampsForKey
getTimestampsForKeyInternal(RoutableKey key)
+ protected AccordSafeTimestampsForKey getTimestampsForKeyInternal(Key key)
{
return timestampsForKeys.get(key);
}
@@ -126,7 +124,7 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
}
@Override
- protected AccordSafeTimestampsForKey
getTimestampsForKeyIfLoaded(RoutableKey key)
+ protected AccordSafeTimestampsForKey getTimestampsForKeyIfLoaded(Key key)
{
AccordSafeTimestampsForKey cfk =
commandStore.timestampsForKeyCache().acquireIfLoaded(key);
if (cfk != null) cfk.preExecute();
@@ -177,14 +175,14 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
Ranges allRanges = ranges.all();
deps.keyDeps.keys().forEach(allRanges, key -> {
// TODO (now): batch register to minimise GC
- deps.keyDeps.forEach(key, txnId -> {
+ deps.keyDeps.forEach(key, (txnId, txnIdx) -> {
// TODO (desired, efficiency): this can be made more efficient
by batching by epoch
if (ranges.coordinates(txnId).contains(key))
return; // already coordinates, no need to replicate
if (!ranges.allBefore(txnId.epoch()).contains(key))
return;
- CommandsForKeys.registerNotWitnessed(this, key, txnId);
+ get(key).registerHistorical(this, txnId);
});
});
CommandsForRanges commandsForRanges = commandStore.commandsForRanges();
@@ -221,7 +219,7 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
for (Key key : keys)
{
if (!slice.contains(key)) continue;
- CommandsForKey commands = commandsForKey(key).current();
+ CommandsForKey commands = get(key).current();
accumulate = map.apply(commands, accumulate);
}
}
@@ -233,11 +231,11 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
Routables<?> sliced = keysOrRanges.slice(slice,
Routables.Slice.Minimal);
if (!context.keys().slice(slice,
Routables.Slice.Minimal).containsAll(sliced))
throw new AssertionError("Range(s) detected not present in
the PreLoadContext: expected " + context.keys() + " but given " + keysOrRanges);
- for (RoutableKey key : timestampsForKeys.keySet())
+ for (Key key : commandsForKeys.keySet())
{
//TODO (duplicate code): this is a repeat of Key... only
change is checking contains in range
if (!sliced.contains(key)) continue;
- CommandsForKey commands = commandsForKey(key).current();
+ CommandsForKey commands = get(key).current();
accumulate = map.apply(commands, accumulate);
}
}
@@ -263,22 +261,20 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
}
@Override
- protected void update(Command prev, Command updated, @Nullable
Seekables<?, ?> keysOrRanges)
+ protected void update(Command prev, Command updated)
{
- super.update(prev, updated, keysOrRanges);
+ super.update(prev, updated);
if (updated.txnId().domain() == Range &&
CommandsForKey.needsUpdate(prev, updated))
{
+ Seekables<?, ?> keysOrRanges = updated.keysOrRanges();
+ if (keysOrRanges == null) keysOrRanges = prev.keysOrRanges();
if (keysOrRanges == null)
- {
- if (updated.known().isDefinitionKnown()) keysOrRanges =
updated.partialTxn().keys();
- else if (prev.known().isDefinitionKnown()) keysOrRanges =
prev.partialTxn().keys();
- else return;
- }
- List<TxnId> waitingOn;
+ return;
- if (updated.partialDeps() == null) waitingOn =
Collections.emptyList();
+ List<TxnId> waitingOn;
// TODO (required): this is faulty: we cannot simply save the raw
transaction ids, as they may be for other ranges
+ if (updated.partialDeps() == null) waitingOn =
Collections.emptyList();
else waitingOn = updated.partialDeps().txnIds();
updateRanges().put(updated.txnId(), (Ranges)keysOrRanges,
updated.saveStatus(), updated.executeAt(), waitingOn);
}
@@ -300,8 +296,8 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
}
public void postExecute(Map<TxnId, AccordSafeCommand> commands,
- Map<RoutableKey, AccordSafeTimestampsForKey>
timestampsForKey,
- Map<RoutableKey, AccordSafeCommandsForKey>
commandsForKeys
+ Map<Key, AccordSafeTimestampsForKey>
timestampsForKey,
+ Map<Key, AccordSafeCommandsForKey> commandsForKeys
)
{
postExecute();
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForKey.java
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForKey.java
index 748143f333..808b4d4bc1 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForKey.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForKey.java
@@ -23,18 +23,17 @@ import java.util.Objects;
import com.google.common.annotations.VisibleForTesting;
import accord.api.Key;
-import accord.impl.CommandsForKey;
-import accord.impl.SafeCommandsForKey;
-import accord.primitives.RoutableKey;
+import accord.local.CommandsForKey;
+import accord.local.SafeCommandsForKey;
-public class AccordSafeCommandsForKey extends SafeCommandsForKey implements
AccordSafeState<RoutableKey, CommandsForKey>
+public class AccordSafeCommandsForKey extends SafeCommandsForKey implements
AccordSafeState<Key, CommandsForKey>
{
private boolean invalidated;
- private final AccordCachingState<RoutableKey, CommandsForKey> global;
+ private final AccordCachingState<Key, CommandsForKey> global;
private CommandsForKey original;
private CommandsForKey current;
- public AccordSafeCommandsForKey(AccordCachingState<RoutableKey,
CommandsForKey> global)
+ public AccordSafeCommandsForKey(AccordCachingState<Key, CommandsForKey>
global)
{
super((Key) global.key());
this.global = global;
@@ -83,7 +82,7 @@ public class AccordSafeCommandsForKey extends
SafeCommandsForKey implements Acco
}
@Override
- public AccordCachingState<RoutableKey, CommandsForKey> global()
+ public AccordCachingState<Key, CommandsForKey> global()
{
checkNotInvalidated();
return global;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordSafeState.java
b/src/java/org/apache/cassandra/service/accord/AccordSafeState.java
index b742efb9d1..374968bcfb 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeState.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeState.java
@@ -25,6 +25,8 @@ public interface AccordSafeState<K, V> extends SafeState<V>
{
void set(V update);
V original();
+ void invalidate();
+ boolean invalidated();
void preExecute();
void postExecute();
AccordCachingState<K, V> global();
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordSafeTimestampsForKey.java
b/src/java/org/apache/cassandra/service/accord/AccordSafeTimestampsForKey.java
index b5b44a7703..a4c48c83e6 100644
---
a/src/java/org/apache/cassandra/service/accord/AccordSafeTimestampsForKey.java
+++
b/src/java/org/apache/cassandra/service/accord/AccordSafeTimestampsForKey.java
@@ -26,17 +26,16 @@ import com.google.common.annotations.VisibleForTesting;
import accord.api.Key;
import accord.impl.SafeTimestampsForKey;
import accord.impl.TimestampsForKey;
-import accord.primitives.RoutableKey;
import accord.primitives.Timestamp;
-public class AccordSafeTimestampsForKey extends SafeTimestampsForKey
implements AccordSafeState<RoutableKey, TimestampsForKey>
+public class AccordSafeTimestampsForKey extends SafeTimestampsForKey
implements AccordSafeState<Key, TimestampsForKey>
{
private boolean invalidated;
- private final AccordCachingState<RoutableKey, TimestampsForKey> global;
+ private final AccordCachingState<Key, TimestampsForKey> global;
private TimestampsForKey original;
private TimestampsForKey current;
- public AccordSafeTimestampsForKey(AccordCachingState<RoutableKey,
TimestampsForKey> global)
+ public AccordSafeTimestampsForKey(AccordCachingState<Key,
TimestampsForKey> global)
{
super((Key) global.key());
this.global = global;
@@ -71,7 +70,7 @@ public class AccordSafeTimestampsForKey extends
SafeTimestampsForKey implements
}
@Override
- public AccordCachingState<RoutableKey, TimestampsForKey> global()
+ public AccordCachingState<Key, TimestampsForKey> global()
{
checkNotInvalidated();
return global;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 2f790e80b7..290541adec 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -123,7 +123,7 @@ public class AccordService implements IAccordService,
Shutdownable
private final AccordDataStore dataStore;
private final AccordJournal journal;
private final CoordinateDurabilityScheduling durabilityScheduling;
- private final AccordVerbHandler<? extends Request> verbHandler;
+ private final AccordVerbHandler<? extends Request> requestHandler;
private final LocalConfig configuration;
@GuardedBy("this")
private State state = State.INIT;
@@ -292,7 +292,7 @@ public class AccordService implements IAccordService,
Shutdownable
configuration);
this.nodeShutdown = toShutdownable(node);
this.durabilityScheduling = new CoordinateDurabilityScheduling(node);
- this.verbHandler = new AccordVerbHandler<>(node, configService,
journal);
+ this.requestHandler = new AccordVerbHandler<>(node, configService,
journal);
}
@Override
@@ -309,14 +309,14 @@ public class AccordService implements IAccordService,
Shutdownable
durabilityScheduling.setShardCycleTime(Ints.checkedCast(DatabaseDescriptor.getAccordShardDurabilityCycle(SECONDS)),
SECONDS);
durabilityScheduling.setTxnIdLag(Ints.checkedCast(DatabaseDescriptor.getAccordScheduleDurabilityTxnIdLag(SECONDS)),
TimeUnit.SECONDS);
durabilityScheduling.setFrequency(Ints.checkedCast(DatabaseDescriptor.getAccordScheduleDurabilityFrequency(SECONDS)),
SECONDS);
- durabilityScheduling.start();
+// durabilityScheduling.start();
state = State.STARTED;
}
@Override
public IVerbHandler<? extends Request> verbHandler()
{
- return verbHandler;
+ return requestHandler;
}
@Override
diff --git
a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
index acf342bf25..426ab4061b 100644
--- a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
+++ b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
@@ -41,7 +41,7 @@ import com.google.common.collect.ImmutableSortedMap;
import accord.api.Key;
import accord.api.RoutingKey;
-import accord.impl.CommandsForKey;
+import accord.local.CommandsForKey;
import accord.impl.CommandsSummary;
import accord.local.Command;
import accord.local.SaveStatus;
@@ -64,7 +64,6 @@ import org.apache.cassandra.utils.IntervalTree;
import static accord.local.SafeCommandStore.*;
import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
import static accord.local.SafeCommandStore.TestDep.WITH;
-import static accord.local.SafeCommandStore.TestStartedAt.ANY;
import static accord.local.SafeCommandStore.TestStartedAt.STARTED_BEFORE;
import static accord.local.SafeCommandStore.TestStatus.ANY_STATUS;
import static accord.local.Status.Stable;
@@ -372,11 +371,13 @@ public class CommandsForRanges
private static Range toRange(Interval<RoutableKey, RangeCommandSummary>
interval)
{
- TokenKey start = (TokenKey) interval.min;
- TokenKey end = (TokenKey) interval.max;
+ AccordRoutingKey start = (AccordRoutingKey) interval.min;
+ if (!(start instanceof AccordRoutingKey.SentinelKey))
+ start = new TokenKey(start.table(),
start.token().decreaseSlightly());
+ AccordRoutingKey end = (AccordRoutingKey) interval.max;
// TODO (required, correctness) : accord doesn't support wrap around,
so decreaseSlightly may fail in some cases
// TODO (required, correctness) : this logic is mostly used for
testing, so is it actually safe for all partitioners?
- return new
TokenRange(start.withToken(start.token().decreaseSlightly()), end);
+ return new TokenRange(start, end);
}
@Nullable
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index f9ab58387d..33f8f2b088 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -117,6 +117,9 @@ public class AccordAgent implements Agent
public boolean isExpired(TxnId initiated, long now)
{
// TODO: should distinguish between reads and writes
+ if (initiated.kind().isSyncPoint())
+ return false;
+
return now - initiated.hlc() > getReadRpcTimeout(MICROSECONDS);
}
diff --git
a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
index b8494e7d44..4b7607a36b 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
@@ -33,12 +33,12 @@ import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.api.Key;
import accord.api.RoutingKey;
import accord.local.KeyHistory;
import accord.local.PreLoadContext;
import accord.primitives.Range;
import accord.primitives.Ranges;
-import accord.primitives.RoutableKey;
import accord.primitives.Seekables;
import accord.primitives.TxnId;
import accord.utils.Invariants;
@@ -121,7 +121,7 @@ public class AsyncLoader
}
}
- private void referenceAndAssembleReadsForKey(RoutableKey key,
+ private void referenceAndAssembleReadsForKey(Key key,
AsyncOperation.Context
context,
List<AsyncChain<?>>
listenChains)
{
@@ -157,7 +157,7 @@ public class AsyncLoader
{
case Key:
// cast to Keys fails...
- Iterable<RoutableKey> keys = (Iterable<RoutableKey>)
keysOrRanges;
+ Iterable<Key> keys = (Iterable<Key>) keysOrRanges;
keys.forEach(key -> referenceAndAssembleReadsForKey(key,
context, chains));
break;
case Range:
@@ -172,7 +172,7 @@ public class AsyncLoader
private AsyncChain<?>
referenceAndDispatchReadsForRange(AsyncOperation.Context context)
{
- AsyncChain<Set<? extends RoutableKey>> overlappingKeys =
findOverlappingKeys((Ranges) keysOrRanges);
+ AsyncChain<Set<? extends Key>> overlappingKeys =
findOverlappingKeys((Ranges) keysOrRanges);
return overlappingKeys.flatMap(keys -> {
if (keys.isEmpty())
@@ -183,14 +183,14 @@ public class AsyncLoader
}, commandStore);
}
- private AsyncChain<Set<? extends RoutableKey>> findOverlappingKeys(Ranges
ranges)
+ private AsyncChain<Set<? extends Key>> findOverlappingKeys(Ranges ranges)
{
Invariants.checkArgument(!ranges.isEmpty());
List<AsyncChain<Set<PartitionKey>>> chains = new
ArrayList<>(ranges.size());
for (Range range : ranges)
chains.add(findOverlappingKeys(range));
- return AsyncChains.reduce(chains, (a, b) ->
ImmutableSet.<RoutableKey>builder().addAll(a).addAll(b).build());
+ return AsyncChains.reduce(chains, (a, b) ->
ImmutableSet.<Key>builder().addAll(a).addAll(b).build());
}
private AsyncChain<Set<PartitionKey>> findOverlappingKeys(Range range)
diff --git
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index bef57bcf0b..f0c53e33d7 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import accord.api.Key;
import accord.local.CommandStore;
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
@@ -67,8 +68,8 @@ public abstract class AsyncOperation<R> extends
AsyncChains.Head<R> implements R
static class Context
{
final HashMap<TxnId, AccordSafeCommand> commands = new HashMap<>();
- final TreeMap<RoutableKey, AccordSafeTimestampsForKey>
timestampsForKey = new TreeMap<>();
- final TreeMap<RoutableKey, AccordSafeCommandsForKey> commandsForKey =
new TreeMap<>();
+ final TreeMap<Key, AccordSafeTimestampsForKey> timestampsForKey = new
TreeMap<>();
+ final TreeMap<Key, AccordSafeCommandsForKey> commandsForKey = new
TreeMap<>();
void releaseResources(AccordCommandStore commandStore)
{
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
index 7294dd2696..49c06e811c 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
@@ -132,7 +132,6 @@ public class AccordInteropApply extends Apply implements
Command.TransientListen
case PreCommitted:
case Committed:
case PreApplied:
- case ReadyToExecute:
synchronized (this)
{
waitingOn.set(safeStore.commandStore().id());
@@ -249,7 +248,6 @@ public class AccordInteropApply extends Apply implements
Command.TransientListen
case PreCommitted:
case Committed:
case PreApplied:
- case ReadyToExecute:
return;
case Applied:
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
index d5144cbe8d..dbe2f4845f 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
@@ -24,10 +24,10 @@ import java.util.Arrays;
import com.google.common.primitives.Ints;
import accord.api.Key;
-import accord.impl.CommandsForKey;
-import accord.impl.CommandsForKey.Info;
-import accord.impl.CommandsForKey.InternalStatus;
-import accord.impl.CommandsForKey.NoInfo;
+import accord.local.CommandsForKey;
+import accord.local.CommandsForKey.TxnInfo;
+import accord.local.CommandsForKey.InternalStatus;
+import accord.local.CommandsForKey.Unmanaged;
import accord.local.Node;
import accord.primitives.Routable.Domain;
import accord.primitives.Timestamp;
@@ -35,8 +35,10 @@ import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.utils.Invariants;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.utils.vint.VIntCoding;
+import static accord.local.CommandsForKey.NO_PENDING_UNMANAGED;
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
import static accord.primitives.Txn.Kind.Read;
import static accord.primitives.Txn.Kind.Write;
@@ -104,23 +106,7 @@ public class CommandsForKeySerializer
{
int commandCount = cfk.size();
if (commandCount == 0)
- {
- // TODO (expected): we should not need to special-case, but best
solution here is not to store redundantBefore;
- // but this requires some modest deeper changes, so for now
special-case serialization when empty
- Timestamp redundantBefore = cfk.redundantBefore();
- ByteBuffer out =
ByteBuffer.allocate(TypeSizes.sizeofUnsignedVInt(0) +
-
TypeSizes.sizeofUnsignedVInt(redundantBefore.epoch()) +
-
TypeSizes.sizeofUnsignedVInt(redundantBefore.hlc()) +
-
TypeSizes.sizeofUnsignedVInt(redundantBefore.flags()) +
-
TypeSizes.sizeofUnsignedVInt(redundantBefore.node.id));
- VIntCoding.writeUnsignedVInt32(0, out);
- VIntCoding.writeUnsignedVInt(redundantBefore.epoch(), out);
- VIntCoding.writeUnsignedVInt(redundantBefore.hlc(), out);
- VIntCoding.writeUnsignedVInt32(redundantBefore.flags(), out);
- VIntCoding.writeUnsignedVInt32(redundantBefore.node.id, out);
- out.flip();
- return out;
- }
+ return ByteBuffer.allocate(1);
int[] nodeIds = cachedInts().getInts(Math.min(64, commandCount));
try
@@ -129,11 +115,9 @@ public class CommandsForKeySerializer
// whether we have any missing transactions to encode, any
executeAt that are not equal to their TxnId
// and whether there are any non-standard flag bits to encode
boolean hasNonStandardFlags = false;
- int nodeIdCount, missingIdCount = 0, executeAtCount = 0,
bitsPerExecuteAtFlags = 0;
+ int nodeIdCount = 0, missingIdCount = 0, executeAtCount = 0,
bitsPerExecuteAtFlags = 0;
int bitsPerExecuteAtEpochDelta = 0, bitsPerExecuteAtHlcDelta = 1;
// to permit us to use full 64 bits and encode in 5 bits we force at least one
hlc bit
{
- nodeIdCount = 1;
- nodeIds[0] = cfk.redundantBefore().node.id;
for (int i = 0 ; i < commandCount ; ++i)
{
if (nodeIdCount + 1 >= nodeIds.length)
@@ -143,24 +127,19 @@ public class CommandsForKeySerializer
nodeIds = cachedInts().resize(nodeIds,
nodeIds.length, nodeIds.length * 2);
}
- TxnId txnId = cfk.txnId(i);
- Info info = cfk.info(i);
+ TxnInfo txn = cfk.get(i);
- hasNonStandardFlags |= txnIdFlags(txnId) != STANDARD;
- nodeIds[nodeIdCount++] = txnId.node.id;
+ hasNonStandardFlags |= txnIdFlags(txn) != STANDARD;
+ nodeIds[nodeIdCount++] = txn.node.id;
- if (info.getClass() == NoInfo.class)
+ missingIdCount += txn.missing().length;
+ if (txn.executeAt == txn)
continue;
- missingIdCount += info.missing.length;
-
- if (info.executeAt == txnId)
- continue;
-
- nodeIds[nodeIdCount++] = info.executeAt.node.id;
- bitsPerExecuteAtEpochDelta =
Math.max(bitsPerExecuteAtEpochDelta,
numberOfBitsToRepresent(info.executeAt.epoch() - txnId.epoch()));
- bitsPerExecuteAtHlcDelta =
Math.max(bitsPerExecuteAtHlcDelta, numberOfBitsToRepresent(info.executeAt.hlc()
- txnId.hlc()));
- bitsPerExecuteAtFlags = Math.max(bitsPerExecuteAtFlags,
numberOfBitsToRepresent(info.executeAt.flags()));
+ nodeIds[nodeIdCount++] = txn.executeAt.node.id;
+ bitsPerExecuteAtEpochDelta =
Math.max(bitsPerExecuteAtEpochDelta,
numberOfBitsToRepresent(txn.executeAt.epoch() - txn.epoch()));
+ bitsPerExecuteAtHlcDelta =
Math.max(bitsPerExecuteAtHlcDelta, numberOfBitsToRepresent(txn.executeAt.hlc()
- txn.hlc()));
+ bitsPerExecuteAtFlags = Math.max(bitsPerExecuteAtFlags,
numberOfBitsToRepresent(txn.executeAt.flags()));
executeAtCount += 1;
}
nodeIdCount = compact(nodeIds);
@@ -175,8 +154,8 @@ public class CommandsForKeySerializer
int maxHeaderBits = minHeaderBits;
int totalBytes = 0;
- long prevEpoch = cfk.redundantBefore().epoch();
- long prevHlc = cfk.redundantBefore().hlc();
+ long prevEpoch = cfk.get(0).epoch();
+ long prevHlc = cfk.get(0).hlc();
int[] bytesHistogram = cachedInts().getInts(12);
Arrays.fill(bytesHistogram, 0);
for (int i = 0 ; i < commandCount ; ++i)
@@ -214,7 +193,7 @@ public class CommandsForKeySerializer
if (hasNonStandardFlags && txnIdFlags(txnId) == RAW)
totalBytes += 2;
- Info info = cfk.info(i);
+ TxnInfo info = cfk.get(i);
if (info.status.hasInfo)
headerBits += infoHeaderBits;
maxHeaderBits = Math.max(headerBits, maxHeaderBits);
@@ -247,9 +226,11 @@ public class CommandsForKeySerializer
// then pick third number as 75th %ile, but at least 1 less
than highest, and one more than second
// finally, ensure third then second are distributed so that
there is no more than a gap of 4 between them and the next
int l0 = Math.max(0, Math.min(3, minBasicBytes - headerBytes));
- int l1 = Math.max(l0+1,
Math.min(l0+4,Arrays.binarySearch(bytesHistogram, commandCount/4) -
headerBytes));
+ int l1 = Arrays.binarySearch(bytesHistogram, minBasicBytes,
maxBasicBytes, commandCount/4);
+ l1 = Math.max(l0+1, Math.min(l0+4, (l1 < 0 ? -1 - l1 : l1) -
headerBytes));
int l3 = Math.max(l1+2, maxBasicBytes - headerBytes);
- int l2 = Math.max(l1+1, Math.min(l3-1,
Arrays.binarySearch(bytesHistogram, (3*commandCount)/4) - headerBytes));
+ int l2 = Arrays.binarySearch(bytesHistogram, minBasicBytes,
maxBasicBytes,(3*commandCount)/4);
+ l2 = Math.max(l1+1, Math.min(l3-1, (l2 < 0 ? -1 -l2 : l2) -
headerBytes));
while (l3-l2 > 4) ++l2;
while (l2-l1 > 4) ++l1;
hlcBytesLookup = setHlcBytes(l0, l1, l2, l3);
@@ -267,16 +248,13 @@ public class CommandsForKeySerializer
totalBytes += TypeSizes.sizeofUnsignedVInt(nodeIds[i] -
nodeIds[i-1]);
totalBytes += 2;
- Arrays.fill(bytesHistogram, minBasicBytes, maxBasicBytes + 1, 0);
cachedInts().forceDiscard(bytesHistogram);
- prevEpoch = cfk.redundantBefore().epoch();
- prevHlc = cfk.redundantBefore().hlc();
+ prevEpoch = cfk.get(0).epoch();
+ prevHlc = cfk.get(0).hlc();
// account for encoding redundantBefore
totalBytes += TypeSizes.sizeofUnsignedVInt(prevEpoch);
totalBytes += TypeSizes.sizeofUnsignedVInt(prevHlc);
- totalBytes += 2; // flags TODO (expected): pack this along with
uniqueIdBits, as usually zero bits should be needed
- totalBytes += (bitsPerNodeId+7)/8;
if (missingIdCount + executeAtCount > 0)
{
@@ -291,6 +269,20 @@ public class CommandsForKeySerializer
totalBytes += 2;
}
+ // count unmanaged bytes
+ int unmanagedPendingCommitCount = 0;
+ for (int i = 0 ; i < cfk.unmanagedCount() ; ++i)
+ {
+ Unmanaged unmanaged = cfk.getUnmanaged(i);
+ if (unmanaged.pending == Unmanaged.Pending.COMMIT)
+ ++unmanagedPendingCommitCount;
+ totalBytes += CommandSerializers.txnId.serializedSize();
+ // TODO (desired): this could be more efficient, e.g.
referencing one of the TxnInfo indexes for timestamp
+ totalBytes += CommandSerializers.timestamp.serializedSize();
+ }
+ totalBytes +=
TypeSizes.sizeofUnsignedVInt(unmanagedPendingCommitCount);
+ totalBytes += TypeSizes.sizeofUnsignedVInt(cfk.unmanagedCount() -
unmanagedPendingCommitCount);
+
ByteBuffer out = ByteBuffer.allocate(totalBytes);
VIntCoding.writeUnsignedVInt32(commandCount, out);
VIntCoding.writeUnsignedVInt32(nodeIdCount, out);
@@ -301,8 +293,6 @@ public class CommandsForKeySerializer
VIntCoding.writeUnsignedVInt(prevEpoch, out);
VIntCoding.writeUnsignedVInt(prevHlc, out);
- out.putShort((short) cfk.redundantBefore().flags());
- writeLeastSignificantBytes(Arrays.binarySearch(nodeIds, 0,
nodeIdCount, cfk.redundantBefore().node.id), (bitsPerNodeId+7)/8, out);
int executeAtMask = executeAtCount > 0 ? 1 : 0;
int missingDepsMask = missingIdCount > 0 ? 1 : 0;
@@ -311,7 +301,7 @@ public class CommandsForKeySerializer
for (int i = 0 ; i < commandCount ; ++i)
{
TxnId txnId = cfk.txnId(i);
- Info info = cfk.info(i);
+ TxnInfo info = cfk.get(i);
InternalStatus status = info.status;
long bits = status.ordinal();
@@ -322,7 +312,7 @@ public class CommandsForKeySerializer
bits |= hasExecuteAt << bitIndex;
bitIndex += statusHasInfo & executeAtMask;
- long hasMissingIds = info.missing != CommandsForKey.NO_TXNIDS
? 1 : 0;
+ long hasMissingIds = info.missing() !=
CommandsForKey.NO_TXNIDS ? 1 : 0;
bits |= hasMissingIds << bitIndex;
bitIndex += statusHasInfo & missingDepsMask;
@@ -392,6 +382,20 @@ public class CommandsForKeySerializer
}
}
+ VIntCoding.writeUnsignedVInt32(unmanagedPendingCommitCount, out);
+ VIntCoding.writeUnsignedVInt32(cfk.unmanagedCount() -
unmanagedPendingCommitCount, out);
+ Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ?
Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT;
+ for (int i = 0 ; i < cfk.unmanagedCount() ; ++i)
+ {
+ Unmanaged unmanaged = cfk.getUnmanaged(i);
+ Invariants.checkState(unmanaged.pending == pending);
+ CommandSerializers.txnId.serialize(unmanaged.txnId, out,
ByteBufferAccessor.instance, out.position());
+ out.position(out.position() +
CommandSerializers.txnId.serializedSize());
+ CommandSerializers.timestamp.serialize(unmanaged.waitingUntil,
out, ByteBufferAccessor.instance, out.position());
+ out.position(out.position() +
CommandSerializers.timestamp.serializedSize());
+ if (--unmanagedPendingCommitCount == 0) pending =
Unmanaged.Pending.APPLY;
+ }
+
if ((executeAtCount | missingIdCount) > 0)
{
int bitsPerCommandId = numberOfBitsToRepresent(commandCount);
@@ -407,21 +411,17 @@ public class CommandsForKeySerializer
for (int i = 0 ; i < commandCount ; ++i)
{
- Info info = cfk.info(i);
- if (info.getClass() == NoInfo.class)
- continue;
-
- TxnId txnId = cfk.txnId(i);
- if (info.executeAt != txnId)
+ TxnInfo txn = cfk.get(i);
+ if (txn.executeAt != txn)
{
- Timestamp executeAt = info.executeAt;
+ Timestamp executeAt = txn.executeAt;
int nodeIdx = Arrays.binarySearch(nodeIds, 0,
nodeIdCount, executeAt.node.id);
if (bitsPerExecuteAt <= 64)
{
- Invariants.checkState(executeAt.epoch() >=
txnId.epoch());
- long executeAtBits = executeAt.epoch() -
txnId.epoch();
+ Invariants.checkState(executeAt.epoch() >=
txn.epoch());
+ long executeAtBits = executeAt.epoch() -
txn.epoch();
int offset = bitsPerExecuteAtEpochDelta;
- executeAtBits |= (executeAt.hlc() - txnId.hlc())
<< offset ;
+ executeAtBits |= (executeAt.hlc() - txn.hlc()) <<
offset ;
offset += bitsPerExecuteAtHlcDelta;
executeAtBits |= ((long)executeAt.flags()) <<
offset;
offset += bitsPerExecuteAtFlags;
@@ -431,9 +431,9 @@ public class CommandsForKeySerializer
}
else
{
- buffer = flushBits(buffer, bufferCount,
executeAt.epoch() - txnId.epoch(), bitsPerExecuteAtEpochDelta, out);
+ buffer = flushBits(buffer, bufferCount,
executeAt.epoch() - txn.epoch(), bitsPerExecuteAtEpochDelta, out);
bufferCount = (bufferCount +
bitsPerExecuteAtEpochDelta) & 63;
- buffer = flushBits(buffer, bufferCount,
executeAt.hlc() - txnId.hlc(), bitsPerExecuteAtHlcDelta, out);
+ buffer = flushBits(buffer, bufferCount,
executeAt.hlc() - txn.hlc(), bitsPerExecuteAtHlcDelta, out);
bufferCount = (bufferCount +
bitsPerExecuteAtHlcDelta) & 63;
buffer = flushBits(buffer, bufferCount,
executeAt.flags(), bitsPerExecuteAtFlags, out);
bufferCount = (bufferCount +
bitsPerExecuteAtFlags) & 63;
@@ -442,16 +442,17 @@ public class CommandsForKeySerializer
}
}
- if (info.missing.length > 0)
+ TxnId[] missing = txn.missing();
+ if (missing.length > 0)
{
int j = 0;
- while (j < info.missing.length - 1)
+ while (j < missing.length - 1)
{
- int missingId = cfk.indexOf(info.missing[j++]);
+ int missingId = cfk.indexOf(missing[j++]);
buffer = flushBits(buffer, bufferCount, missingId,
bitsPerMissingId, out);
bufferCount = (bufferCount + bitsPerMissingId) &
63;
}
- int missingId =
cfk.indexOf(info.missing[info.missing.length - 1]);
+ int missingId = cfk.indexOf(missing[missing.length -
1]);
missingId |= 1L << bitsPerCommandId;
buffer = flushBits(buffer, bufferCount, missingId,
bitsPerMissingId, out);
bufferCount = (bufferCount + bitsPerMissingId) & 63;
@@ -461,6 +462,7 @@ public class CommandsForKeySerializer
writeMostSignificantBytes(buffer, (bufferCount + 7)/8, out);
}
+ Invariants.checkState(!out.hasRemaining());
out.flip();
return out;
}
@@ -494,16 +496,10 @@ public class CommandsForKeySerializer
in = in.duplicate();
int commandCount = VIntCoding.readUnsignedVInt32(in);
if (commandCount == 0)
- {
- long epoch = VIntCoding.readUnsignedVInt(in);
- long hlc = VIntCoding.readUnsignedVInt(in);
- int flags = VIntCoding.readUnsignedVInt32(in);
- Node.Id id = new Node.Id(VIntCoding.readUnsignedVInt32(in));
- return new
CommandsForKey(key).withoutRedundant(TxnId.fromValues(epoch, hlc, flags, id));
- }
+ return new CommandsForKey(key);
- TxnId[] txnIds = new TxnId[commandCount];
- Info[] infos = new Info[commandCount];
+ TxnId[] txnIds = cachedTxnIds().get(commandCount);
+ TxnInfo[] txns = new TxnInfo[commandCount];
int nodeIdCount = VIntCoding.readUnsignedVInt32(in);
int bitsPerNodeId = numberOfBitsToRepresent(nodeIdCount);
long nodeIdMask = (1L << bitsPerNodeId) - 1;
@@ -526,12 +522,8 @@ public class CommandsForKeySerializer
hlcBytesLookup = setHlcByteDeltas((flags >>> 5) & 0x3, (flags >>>
7) & 0x3, (flags >>> 9) & 0x3, (flags >>> 11) & 0x3);
}
- long prevEpoch = VIntCoding.readUnsignedVInt32(in);
- long prevHlc = VIntCoding.readUnsignedVInt32(in);
- TxnId redundantBefore = TxnId.fromValues(prevEpoch, prevHlc,
in.getShort(),
-
nodeIds[(int)readLeastSignificantBytes((bitsPerNodeId+7)/8, in)]);
-
-
+ long prevEpoch = VIntCoding.readUnsignedVInt(in);
+ long prevHlc = VIntCoding.readUnsignedVInt(in);
for (int i = 0 ; i < commandCount ; ++i)
{
long header = readLeastSignificantBytes(headerByteCount, in);
@@ -601,12 +593,34 @@ public class CommandsForKeySerializer
: TxnId.fromValues(epoch, hlc, flags,
node);
txnIds[i] = txnId;
- infos[i] = DECODE_INFOS[(executeAtInfoOffset |
missingDepsInfoOffset)*STATUS_COUNT + status.ordinal()];
+ txns[i] = DECODE_INFOS[(executeAtInfoOffset |
missingDepsInfoOffset)*STATUS_COUNT + status.ordinal()];
prevEpoch = epoch;
prevHlc = hlc;
}
+ int unmanagedPendingCommitCount = VIntCoding.readUnsignedVInt32(in);
+ int unmanagedCount = unmanagedPendingCommitCount +
VIntCoding.readUnsignedVInt32(in);
+ Unmanaged[] unmanageds;
+ if (unmanagedCount == 0)
+ {
+ unmanageds = NO_PENDING_UNMANAGED;
+ }
+ else
+ {
+ unmanageds = new Unmanaged[unmanagedCount];
+ Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ?
Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT;
+ for (int i = 0 ; i < unmanagedCount ; ++i)
+ {
+ TxnId txnId = CommandSerializers.txnId.deserialize(in,
ByteBufferAccessor.instance, in.position());
+ in.position(in.position() +
CommandSerializers.txnId.serializedSize());
+ Timestamp waitingUntil =
CommandSerializers.timestamp.deserialize(in, ByteBufferAccessor.instance,
in.position());
+ in.position(in.position() +
CommandSerializers.timestamp.serializedSize());
+ unmanageds[i] = new Unmanaged(pending, txnId, waitingUntil);
+ if (--unmanagedPendingCommitCount == 0) pending =
Unmanaged.Pending.APPLY;
+ }
+ }
+
if (executeAtMasks + missingDepsMasks > 0)
{
TxnId[] missingIdBuffer = cachedTxnIds().get(8);
@@ -630,13 +644,10 @@ public class CommandsForKeySerializer
for (int i = 0 ; i < commandCount ; ++i)
{
- Info info = infos[i];
- if (info.getClass() == NoInfo.class)
- continue;
-
TxnId txnId = txnIds[i];
- Timestamp executeAt = txnId;
- if (info.executeAt == null)
+ TxnInfo placeholder = txns[i];
+ Timestamp executeAt;
+ if (placeholder.executeAt == null)
{
long epoch, hlc;
int flags;
@@ -661,8 +672,12 @@ public class CommandsForKeySerializer
}
executeAt = Timestamp.fromValues(epoch, hlc, flags, id);
}
+ else
+ {
+ executeAt = txnId;
+ }
- TxnId[] missing = info.missing;
+ TxnId[] missing = placeholder.missing();
if (missing == null)
{
int prev = -1;
@@ -684,13 +699,19 @@ public class CommandsForKeySerializer
missingIdCount = 0;
}
- infos[i] = Info.create(txnId, info.status, executeAt, missing);
+ txns[i] = TxnInfo.create(txnId, placeholder.status, executeAt,
missing);
}
cachedTxnIds().forceDiscard(missingIdBuffer, maxIdBufferCount);
}
+ else
+ {
+ for (int i = 0 ; i < commandCount ; ++i)
+ txns[i] = TxnInfo.create(txnIds[i], txns[i].status, txnIds[i]);
+ }
+ cachedTxnIds().forceDiscard(txnIds, commandCount);
- return CommandsForKey.SerializerSupport.create(key, redundantBefore,
txnIds, infos);
+ return CommandsForKey.SerializerSupport.create(key, txns, unmanageds);
}
private static int getHlcBytes(int lookup, int index)
@@ -833,16 +854,16 @@ public class CommandsForKeySerializer
private static final Txn.Kind[] TXN_ID_FLAG_BITS_KIND_LOOKUP = new
Txn.Kind[] { Read, Write, ExclusiveSyncPoint, null };
private static final int STATUS_COUNT = InternalStatus.values().length;
- private static final Info[] DECODE_INFOS = new Info[4 * STATUS_COUNT];
+ private static final TxnInfo[] DECODE_INFOS = new TxnInfo[4 *
STATUS_COUNT];
static
{
for (InternalStatus status : InternalStatus.values())
{
int ordinal = status.ordinal();
- DECODE_INFOS[ordinal] = status.asNoInfo;
- DECODE_INFOS[STATUS_COUNT+ordinal] = Info.createMock(status,
Timestamp.NONE, null);
- DECODE_INFOS[2*STATUS_COUNT+ordinal] = Info.createMock(status,
null, CommandsForKey.NO_TXNIDS);
- DECODE_INFOS[3*STATUS_COUNT+ordinal] = Info.createMock(status,
null, null);
+ DECODE_INFOS[ordinal] = TxnInfo.createMock(TxnId.NONE, status,
TxnId.NONE, CommandsForKey.NO_TXNIDS);
+ DECODE_INFOS[STATUS_COUNT+ordinal] =
TxnInfo.createMock(TxnId.NONE, status, TxnId.NONE, null);
+ DECODE_INFOS[2*STATUS_COUNT+ordinal] =
TxnInfo.createMock(TxnId.NONE, status, null, CommandsForKey.NO_TXNIDS);
+ DECODE_INFOS[3*STATUS_COUNT+ordinal] =
TxnInfo.createMock(TxnId.NONE, status, null, null);
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
index 930807d7f0..3efb9e2c6c 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
@@ -22,40 +22,63 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import accord.local.Command.WaitingOn;
-import accord.primitives.Deps;
+import accord.primitives.Keys;
+import accord.primitives.Routable;
+import accord.primitives.TxnId;
import accord.utils.ImmutableBitSet;
import accord.utils.Invariants;
import accord.utils.SimpleBitSet;
+import accord.utils.SortedArrays.SortedArrayList;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.vint.VIntCoding;
public class WaitingOnSerializer
{
- public static void serialize(WaitingOn waitingOn, DataOutputPlus out)
throws IOException
+ public static void serialize(TxnId txnId, WaitingOn waitingOn,
DataOutputPlus out) throws IOException
{
- // TODO (expected): use run length encoding; we know that at most
1/3rd of bits will be set between the three bitsets
- int length = (waitingOn.deps.txnIdCount() + 63) / 64;
- serialize(length, waitingOn.waitingOnCommit, out);
- serialize(length, waitingOn.waitingOnApply, out);
- serialize(length, waitingOn.appliedOrInvalidated, out);
+ out.writeUnsignedVInt32(waitingOn.keys.size());
+ out.writeUnsignedVInt32(waitingOn.txnIds.size());
+ int keyCount = waitingOn.keys.size();
+ int txnIdCount = waitingOn.txnIds.size();
+ int waitingOnLength = (txnIdCount + keyCount + 63) / 64;
+ serialize(waitingOnLength, waitingOn.waitingOn, out);
+ if (txnId.domain() == Routable.Domain.Range)
+ {
+ int appliedOrInvalidatedLength = (txnIdCount + 63) / 64;
+ serialize(appliedOrInvalidatedLength,
waitingOn.appliedOrInvalidated, out);
+ }
}
- public static WaitingOn deserialize(Deps deps, DataInputPlus in) throws
IOException
+ public static WaitingOn deserialize(TxnId txnId, Keys keys,
SortedArrayList<TxnId> txnIds, DataInputPlus in) throws IOException
{
- int length = (deps.txnIdCount() + 63) / 64;
- ImmutableBitSet waitingOnCommit = deserialize(length, in);
- ImmutableBitSet waitingOnApply = deserialize(length, in);
- ImmutableBitSet appliedOrInvalidated = deserialize(length, in);
- return new WaitingOn(deps, waitingOnCommit, waitingOnApply,
appliedOrInvalidated);
+ int a = in.readUnsignedVInt32();
+ int b = in.readUnsignedVInt32();
+ int waitingOnLength = (txnIds.size() + keys.size() + 63) / 64;
+ ImmutableBitSet waitingOn = deserialize(waitingOnLength, in);
+ ImmutableBitSet appliedOrInvalidated = null;
+ if (txnId.domain() == Routable.Domain.Range)
+ {
+ int appliedOrInvalidatedLength = (txnIds.size() + 63) / 64;
+ appliedOrInvalidated = deserialize(appliedOrInvalidatedLength, in);
+ }
+ return new WaitingOn(keys, txnIds, waitingOn, appliedOrInvalidated);
}
public static long serializedSize(WaitingOn waitingOn)
{
- int length = (waitingOn.deps.txnIdCount() + 63) / 64;
- return serializedSize(length, waitingOn.waitingOnCommit)
- + serializedSize(length, waitingOn.waitingOnApply)
- + serializedSize(length, waitingOn.appliedOrInvalidated);
+ int keyCount = waitingOn.keys.size();
+ int txnIdCount = waitingOn.txnIds.size();
+ int waitingOnLength = (txnIdCount + keyCount + 63) / 64;
+ long size = serializedSize(waitingOnLength, waitingOn.waitingOn);
+ size += TypeSizes.sizeofUnsignedVInt(keyCount);
+ size += TypeSizes.sizeofUnsignedVInt(txnIdCount);
+ if (waitingOn.appliedOrInvalidated == null)
+ return size;
+
+ int appliedOrInvalidatedLength = (txnIdCount + 63) / 64;
+ return size + serializedSize(appliedOrInvalidatedLength,
waitingOn.appliedOrInvalidated);
}
private static void serialize(int length, SimpleBitSet write,
DataOutputPlus out) throws IOException
@@ -81,14 +104,23 @@ public class WaitingOnSerializer
return (long) TypeSizes.LONG_SIZE * length;
}
- public static ByteBuffer serialize(WaitingOn waitingOn) throws IOException
+ public static ByteBuffer serialize(TxnId txnId, WaitingOn waitingOn)
throws IOException
{
- int length = (waitingOn.deps.txnIdCount() + 63) / 64;
- ByteBuffer out = ByteBuffer.allocate(TypeSizes.LONG_SIZE * length * 3);
- serialize(length, waitingOn.waitingOnCommit, out);
- serialize(length, waitingOn.waitingOnApply, out);
- serialize(length, waitingOn.appliedOrInvalidated, out);
- return (ByteBuffer) out.flip();
+ int keyCount = waitingOn.keys.size();
+ int txnIdCount = waitingOn.txnIds.size();
+ int waitingOnLength = (txnIdCount + keyCount + 63) / 64;
+ int appliedOrInvalidatedLength = 0;
+ if (txnId.domain() == Routable.Domain.Range)
+ appliedOrInvalidatedLength = (txnIdCount + 63) / 64;
+
+ ByteBuffer out =
ByteBuffer.allocate(TypeSizes.sizeofUnsignedVInt(keyCount) +
TypeSizes.sizeofUnsignedVInt(txnIdCount)
+ + TypeSizes.LONG_SIZE *
(waitingOnLength + appliedOrInvalidatedLength));
+ VIntCoding.writeUnsignedVInt32(keyCount, out);
+ VIntCoding.writeUnsignedVInt32(txnIdCount, out);
+ serialize(waitingOnLength, waitingOn.waitingOn, out);
+ if (appliedOrInvalidatedLength > 0)
+ serialize(appliedOrInvalidatedLength,
waitingOn.appliedOrInvalidated, out);
+ return out.flip();
}
private static void serialize(int length, SimpleBitSet write, ByteBuffer
out)
@@ -99,16 +131,23 @@ public class WaitingOnSerializer
out.putLong(bits[i]);
}
- public static WaitingOn deserialize(Deps deps, ByteBuffer in) throws
IOException
+ public static WaitingOn deserialize(TxnId txnId, Keys keys,
SortedArrayList<TxnId> txnIds, ByteBuffer in) throws IOException
{
- int length = (deps.txnIdCount() + 63) / 64;
+ int waitingOnLength = (txnIds.size() + keys.size() + 63) / 64;
int position = in.position();
- ImmutableBitSet waitingOnCommit = deserialize(position, length, in);
- position += length*8;
- ImmutableBitSet waitingOnApply = deserialize(position, length, in);
- position += length*8;
- ImmutableBitSet appliedOrInvalidated = deserialize(position, length,
in);
- return new WaitingOn(deps, waitingOnCommit, waitingOnApply,
appliedOrInvalidated);
+ int a = VIntCoding.readUnsignedVInt32(in, position);
+ position += TypeSizes.sizeofUnsignedVInt(a);
+ int b = VIntCoding.readUnsignedVInt32(in, position);
+ position += TypeSizes.sizeofUnsignedVInt(a);
+ ImmutableBitSet waitingOn = deserialize(position, waitingOnLength, in);
+ ImmutableBitSet appliedOrInvalidated = null;
+ if (txnId.domain() == Routable.Domain.Range)
+ {
+ position += waitingOnLength*8;
+ int appliedOrInvalidatedLength = (txnIds.size() + 63) / 64;
+ appliedOrInvalidated = deserialize(position,
appliedOrInvalidatedLength, in);
+ }
+ return new WaitingOn(keys, txnIds, waitingOn, appliedOrInvalidated);
}
private static ImmutableBitSet deserialize(int position, int length,
ByteBuffer in)
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index ec85d63f2c..5e3e9ff831 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -35,9 +35,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.DataStore;
+import accord.api.Key;
import accord.api.Write;
import accord.impl.AbstractSafeCommandStore;
-import accord.impl.CommandsForKeys;
+import accord.impl.TimestampsForKeys;
import accord.impl.TimestampsForKey;
import accord.local.SafeCommandStore;
import accord.primitives.PartialTxn;
@@ -375,7 +376,7 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
// TODO (expected, efficiency): 99.9999% of the time we can just use
executeAt.hlc(), so can avoid bringing
// cfk into memory by retaining at all times in memory key ranges
that are dirty and must use this logic;
// any that aren't can just use executeAt.hlc
- TimestampsForKey cfk =
CommandsForKeys.updateLastExecutionTimestamps((AbstractSafeCommandStore<?,?,?>)
safeStore, (RoutableKey) key, executeAt, true);
+ TimestampsForKey cfk =
TimestampsForKeys.updateLastExecutionTimestamps((AbstractSafeCommandStore<?,?,?>)
safeStore, (Key) key, executeAt, true);
long timestamp = AccordSafeTimestampsForKey.timestampMicrosFor(cfk,
executeAt, true);
// TODO (low priority - do we need to compute nowInSeconds, or can we
just use executeAt?)
int nowInSeconds = AccordSafeTimestampsForKey.nowInSecondsFor(cfk,
executeAt, true);
diff --git a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
index 8b52bb41e4..dc873f0210 100644
--- a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
+++ b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
@@ -126,13 +126,27 @@ public class VIntCoding
return retval;
}
+ @DontInline
+ private static long readUnsignedVIntSlow(ByteBuffer in, int position, byte
firstByte)
+ {
+ int size = numberOfExtraBytesToRead(firstByte);
+ long retval = firstByte & firstByteValueMask(size);
+ for (int ii = 0; ii < size; ii++)
+ {
+ byte b = in.get(position++);
+ retval <<= 8;
+ retval |= b & 0xff;
+ }
+
+ return retval;
+ }
+
public static long readUnsignedVInt(ByteBuffer in)
{
byte firstByte = in.get();
if (firstByte >= 0)
return firstByte;
-
int position = in.position();
int limit = in.limit();
if (limit - position < 8)
@@ -155,6 +169,32 @@ public class VIntCoding
return retval;
}
+ public static long readUnsignedVInt(ByteBuffer in, int position)
+ {
+ byte firstByte = in.get(position++);
+ if (firstByte >= 0)
+ return firstByte;
+
+ int limit = in.limit();
+ if (limit - position < 8)
+ return readUnsignedVIntSlow(in, position, firstByte);
+
+ int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte);
+ int extraBits = extraBytes * 8;
+
+ long retval = in.getLong(position);
+ if (in.order() == ByteOrder.LITTLE_ENDIAN)
+ retval = Long.reverseBytes(retval);
+
+ // truncate the bytes we read in excess of those we needed
+ retval >>>= 64 - extraBits;
+ // remove the non-value bits from the first byte
+ firstByte &= VIntCoding.firstByteValueMask(extraBytes);
+ // shift the first byte up to its correct position
+ retval |= (long) firstByte << extraBits;
+ return retval;
+ }
+
public static void skipUnsignedVInt(DataInputPlus input) throws IOException
{
int firstByte = input.readByte();
@@ -330,6 +370,11 @@ public class VIntCoding
return checkedCast(readUnsignedVInt(input));
}
+ public static int readUnsignedVInt32(ByteBuffer input, int position)
+ {
+ return checkedCast(readUnsignedVInt(input, position));
+ }
+
// & this with the first byte to give the value part for a given
extraBytesToRead encoded in the byte
public static int firstByteValueMask(int extraBytesToRead)
{
diff --git a/test/conf/logback-dtest-quiet.xml
b/test/conf/logback-dtest-quiet.xml
new file mode 100644
index 0000000000..bb9f983177
--- /dev/null
+++ b/test/conf/logback-dtest-quiet.xml
@@ -0,0 +1,56 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<configuration debug="false" scan="true" scanPeriod="60 seconds">
+ <define name="cluster_id"
class="org.apache.cassandra.distributed.impl.ClusterIDDefiner" />
+ <define name="instance_id"
class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" />
+
+ <!-- Shutdown hook ensures that async appender flushes -->
+ <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
+
+ <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender">
+
<file>./build/test/logs/${cassandra.testtag}/${suitename}/${cluster_id}/${instance_id}/system.log</file>
+ <encoder>
+ <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L -
%msg%n</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>INFO</level>
+ </filter>
+ <immediateFlush>true</immediateFlush>
+ </appender>
+
+ <appender name="INSTANCESTDERR" target="System.err"
class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>ERROR</level>
+ </filter>
+ </appender>
+
+ <!-- Un-comment to enable TRACE logging for Accord transactions.
+ <logger name="accord" level="TRACE" />
+ <logger name="org.apache.cassandra.service.accord" level="TRACE" />
+ -->
+
+ <root level="DEBUG">
+ <appender-ref ref="INSTANCEFILE" /> <!-- use blocking to avoid race
conditions with appending and searching -->
+ <appender-ref ref="INSTANCESTDERR" />
+ </root>
+</configuration>
diff --git
a/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
b/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
index aee6aeaeb8..5b27d3d44f 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.distributed.api;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
import org.apache.cassandra.distributed.shared.FutureUtils;
@@ -60,6 +61,8 @@ public interface ICoordinator
}
SimpleQueryResult executeWithResult(String query, ConsistencyLevel
consistencyLevel, Object... boundValues);
+ Future<?> executeWithResult(BiConsumer<SimpleQueryResult, Throwable>
callback, String query, ConsistencyLevel consistencyLevel, Object...
boundValues);
+ Future<?> executeWithResult(BiConsumer<SimpleQueryResult, Throwable>
callback, String query, ConsistencyLevel serialConsistencyLevel,
ConsistencyLevel commitConsistencyLevel, Object... boundValues);
default SimpleQueryResult executeWithResult(String query, ConsistencyLevel
serialConsistencyLevel, ConsistencyLevel commitConsistencyLevel, Object...
boundValues)
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 19d80626d3..e89ab8faa3 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
import com.google.common.collect.Iterators;
@@ -66,6 +67,30 @@ public class Coordinator implements ICoordinator
return instance().sync(() -> unsafeExecuteInternal(query,
consistencyLevel, boundValues)).call();
}
+ @Override
+ public Future<?> executeWithResult(BiConsumer<SimpleQueryResult,
Throwable> callback, String query, ConsistencyLevel consistencyLevel, Object...
boundValues)
+ {
+ return executeWithResult(callback, query, null, consistencyLevel,
boundValues);
+ }
+
+ @Override
+ public Future<?> executeWithResult(BiConsumer<SimpleQueryResult,
Throwable> callback, String query, ConsistencyLevel serialConsistencyLevel,
ConsistencyLevel commitConsistencyLevel, Object... boundValues)
+ {
+ return instance().async(cb -> {
+ SimpleQueryResult result;
+ try
+ {
+ result = unsafeExecuteInternal(query, serialConsistencyLevel,
commitConsistencyLevel, boundValues);
+ }
+ catch (Throwable t)
+ {
+ callback.accept(null, t);
+ return;
+ }
+ callback.accept(result, null);
+ }).apply(callback);
+ }
+
public Future<SimpleQueryResult> asyncExecuteWithTracingWithResult(UUID
sessionId, String query, ConsistencyLevel consistencyLevelOrigin, Object...
boundValues)
{
return instance.async(() -> {
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index e2036a2382..464bc3b8bf 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -492,7 +492,7 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
@Override
public void receiveMessage(IMessage message)
{
- sync(receiveMessageRunnable(message)).accept(false);
+ async(receiveMessageRunnable(message)).apply(false);
}
@Override
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
new file mode 100644
index 0000000000..42e7fbf34a
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import com.google.common.util.concurrent.RateLimiter;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.utils.EstimatedHistogram;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+public class AccordLoadTest extends AccordTestBase
+{
+ private static final Logger logger =
LoggerFactory.getLogger(AccordLoadTest.class);
+
+ @BeforeClass
+ public static void setUp() throws IOException
+ {
+ AccordTestBase.setupCluster(builder -> builder.withConfig(config ->
config.set("lwt_strategy", "accord").set("non_serial_write_strategy",
"accord")), 2);
+ }
+
+ @Ignore
+ @Test
+ public void testLoad() throws Exception
+ {
+ test("CREATE TABLE " + qualifiedTableName + " (k int, v int, PRIMARY
KEY(k))",
+ cluster -> {
+ ICoordinator coordinator = cluster.coordinator(1);
+ final int batchSize = 1000;
+ final int concurrency = 100;
+ final int ratePerSecond = 1000;
+ final int keyCount = 10;
+ for (int i = 1; i <= keyCount; i++)
+ coordinator.execute("INSERT INTO " + qualifiedTableName +
" (k, v) VALUES (0, 0) USING TIMESTAMP 0;", ConsistencyLevel.ALL, i);
+
+ Random random = new Random();
+// CopyOnWriteArrayList<Throwable> exceptions = new
CopyOnWriteArrayList<>();
+ final Semaphore inFlight = new Semaphore(concurrency);
+ final RateLimiter rateLimiter =
RateLimiter.create(ratePerSecond);
+ long testStart = System.nanoTime();
+// while (NANOSECONDS.toMinutes(System.nanoTime() - testStart)
< 10 && exceptions.size() < 10000)
+ while (true)
+ {
+ final EstimatedHistogram histogram = new
EstimatedHistogram(200);
+ long batchStart = System.nanoTime();
+ for (int i = 0 ; i < batchSize ; ++i)
+ {
+ inFlight.acquire();
+ rateLimiter.acquire();
+ long commandStart = System.nanoTime();
+ coordinator.executeWithResult((success, fail) -> {
+ inFlight.release();
+ if (fail == null)
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
+// else exceptions.add(fail);
+ }, "UPDATE " + qualifiedTableName + " SET v += 1
WHERE k = ? IF EXISTS;", ConsistencyLevel.SERIAL, ConsistencyLevel.QUORUM,
random.nextInt(keyCount));
+ }
+ System.out.printf("%tT rate: %.2f/s\n", new Date(),
(((float)batchSize * 1000) / NANOSECONDS.toMillis(System.nanoTime() -
batchStart)));
+ System.out.printf("%tT percentiles: %d %d %d %d\n", new
Date(), histogram.percentile(.25)/1000, histogram.percentile(.5)/1000,
histogram.percentile(.75)/1000, histogram.percentile(1)/1000);
+ }
+ }
+ );
+ }
+
+ @Override
+ protected Logger logger()
+ {
+ return logger;
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index 95c495a350..ee38ffceb9 100644
---
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
import accord.api.Key;
import accord.api.Result;
-import accord.impl.CommandsForKey;
+import accord.local.CommandsForKey;
import accord.local.CheckedCommands;
import accord.local.Command;
import accord.local.CommandStore;
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index af4e3f738b..fdaf519874 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -31,8 +31,8 @@ import org.slf4j.LoggerFactory;
import accord.api.Key;
import accord.api.Result;
-import accord.impl.CommandsForKey;
-import accord.impl.CommandsForKeys;
+import accord.local.CommandsForKey;
+import accord.impl.TimestampsForKeys;
import accord.impl.TimestampsForKey;
import accord.local.Command;
import accord.local.CommonAttributes;
@@ -41,10 +41,13 @@ import accord.messages.Apply;
import accord.primitives.Ballot;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
+import accord.primitives.Range;
import accord.primitives.Ranges;
+import accord.primitives.Routable;
import accord.primitives.Route;
import accord.primitives.RoutingKeys;
import accord.primitives.Timestamp;
+import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.utils.ImmutableBitSet;
@@ -102,20 +105,20 @@ public class AccordCommandStoreTest
AtomicLong clock = new AtomicLong(0);
PartialTxn depTxn = createPartialTxn(0);
Key key = (Key)depTxn.keys().get(0);
+ Range range = key.toUnseekable().asRange();
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, c, v) VALUES
(0, 0, 1)");
TableId tableId = Schema.instance.getTableMetadata("ks", "tbl").id;
- TxnId oldTxnId1 = txnId(1, clock.incrementAndGet(), 1);
- TxnId oldTxnId2 = txnId(1, clock.incrementAndGet(), 1);
- TxnId oldTimestamp = txnId(1, clock.incrementAndGet(), 1);
- TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
+ TxnId oldTxnId1 = txnId(1, clock.incrementAndGet(), 1, Txn.Kind.Write,
Routable.Domain.Range);
+ TxnId oldTxnId2 = txnId(1, clock.incrementAndGet(), 1, Txn.Kind.Write,
Routable.Domain.Range);
+ TxnId txnId = txnId(1, clock.incrementAndGet(), 1, Txn.Kind.Write,
Routable.Domain.Range);
PartialDeps dependencies;
try (PartialDeps.Builder builder =
PartialDeps.builder(depTxn.covering()))
{
- builder.add(key, oldTxnId1);
- builder.add(key, oldTxnId2);
+ builder.add(range, oldTxnId1);
+ builder.add(range, oldTxnId2);
dependencies = builder.build();
}
@@ -130,11 +133,9 @@ public class AccordCommandStoreTest
Ballot accepted = ballot(1, clock.incrementAndGet(), 1);
Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 1);
attrs.partialDeps(dependencies);
- SimpleBitSet waitingOnCommit = new SimpleBitSet(2);
- waitingOnCommit.set(0);
- SimpleBitSet waitingOnApply = new SimpleBitSet(2);
+ SimpleBitSet waitingOnApply = new SimpleBitSet(3);
waitingOnApply.set(1);
- Command.WaitingOn waitingOn = new Command.WaitingOn(dependencies, new
ImmutableBitSet(waitingOnCommit), new ImmutableBitSet(waitingOnApply), new
ImmutableBitSet(2));
+ Command.WaitingOn waitingOn = new
Command.WaitingOn(dependencies.keyDeps.keys(), dependencies.rangeDeps.txnIds(),
new ImmutableBitSet(waitingOnApply), new ImmutableBitSet(2));
attrs.addListener(new Command.ProxyListener(oldTxnId1));
Pair<Writes, Result> result =
AccordTestUtils.processTxnResult(commandStore, txnId, txn, executeAt);
@@ -183,10 +184,10 @@ public class AccordCommandStoreTest
AccordSafeTimestampsForKey tfk = new
AccordSafeTimestampsForKey(loaded(key, null));
tfk.initialize();
- CommandsForKeys.updateLastExecutionTimestamps(commandStore, tfk,
txnId1, true);
+ TimestampsForKeys.updateLastExecutionTimestamps(commandStore, tfk,
txnId1, true);
Assert.assertEquals(txnId1.hlc(),
AccordSafeTimestampsForKey.timestampMicrosFor(tfk.current(), txnId1, true));
- CommandsForKeys.updateLastExecutionTimestamps(commandStore, tfk,
txnId2, true);
+ TimestampsForKeys.updateLastExecutionTimestamps(commandStore, tfk,
txnId2, true);
Assert.assertEquals(txnId2.hlc(),
AccordSafeTimestampsForKey.timestampMicrosFor(tfk.current(), txnId2, true));
Assert.assertEquals(txnId2, tfk.current().lastExecutedTimestamp());
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index 5b1a1402a2..71e821d329 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -26,7 +26,7 @@ import org.junit.Test;
import accord.api.Key;
import accord.api.RoutingKey;
-import accord.impl.CommandsForKey;
+import accord.local.CommandsForKey;
import accord.local.Command;
import accord.local.KeyHistory;
import accord.local.Node;
@@ -117,7 +117,7 @@ public class AccordCommandTest
Assert.assertEquals(Status.PreAccepted, command.status());
Assert.assertTrue(command.partialDeps() == null ||
command.partialDeps().isEmpty());
- CommandsForKey cfk = ((AccordSafeCommandStore)
instance).commandsForKey(key(1)).current();
+ CommandsForKey cfk = ((AccordSafeCommandStore)
instance).get(key(1)).current();
Assert.assertTrue(cfk.indexOf(txnId) >= 0);
}));
@@ -145,7 +145,7 @@ public class AccordCommandTest
Assert.assertEquals(Status.Accepted, command.status());
Assert.assertEquals(deps, command.partialDeps());
- CommandsForKey cfk = ((AccordSafeCommandStore)
instance).commandsForKey(key(1)).current();
+ CommandsForKey cfk = ((AccordSafeCommandStore)
instance).get(key(1)).current();
Assert.assertTrue(cfk.indexOf(txnId) >= 0);
}));
@@ -160,7 +160,7 @@ public class AccordCommandTest
Assert.assertTrue(command.hasBeen(Status.Committed));
Assert.assertEquals(commit.partialDeps, command.partialDeps());
- CommandsForKey cfk = ((AccordSafeCommandStore)
instance).commandsForKey(key(1)).current();
+ CommandsForKey cfk = ((AccordSafeCommandStore)
instance).get(key(1)).current();
Assert.assertTrue(cfk.indexOf(txnId) >= 0);
}));
}
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index 66d0c8e436..0ae2a6abc8 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -30,6 +30,8 @@ import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import javax.annotation.Nullable;
+
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.Assert;
@@ -59,6 +61,7 @@ import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
import accord.primitives.Participants;
import accord.primitives.Ranges;
+import accord.primitives.Routable;
import accord.primitives.Route;
import accord.primitives.Seekable;
import accord.primitives.Seekables;
@@ -199,8 +202,8 @@ public class AccordTestUtils
@Override public void executed(Command command, ProgressShard
progressShard) {}
@Override public void clear(TxnId txnId) {}
@Override public void durable(Command command) {}
- @Override
- public void waiting(SafeCommand blockedBy, LocalExecution
blockedUntil, Route<?> blockedOnRoute, Participants<?> blockedOnParticipants) {}
+ @Override public void waiting(SafeCommand blockedBy, LocalExecution
blockedUntil, Route<?> blockedOnRoute, Participants<?> blockedOnParticipants) {}
+ @Override public void waiting(TxnId blockedBy, LocalExecution
blockedUntil, @Nullable Route<?> blockedOnRoute, @Nullable Participants<?>
blockedOnParticipants) {}
};
public static TxnId txnId(long epoch, long hlc, int node)
@@ -213,6 +216,11 @@ public class AccordTestUtils
return new TxnId(epoch, hlc, kind, Key, new Node.Id(node));
}
+ public static TxnId txnId(long epoch, long hlc, int node, Txn.Kind kind,
Routable.Domain domain)
+ {
+ return new TxnId(epoch, hlc, kind, domain, new Node.Id(node));
+ }
+
public static Timestamp timestamp(long epoch, long hlc, int node)
{
return Timestamp.fromValues(epoch, hlc, new Node.Id(node));
diff --git
a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
index 8c952f02c0..33f0dc2458 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
@@ -32,13 +32,12 @@ import org.junit.BeforeClass;
import org.junit.Test;
import accord.api.Key;
-import accord.impl.CommandsForKey;
+import accord.local.CommandsForKey;
import accord.impl.TimestampsForKey;
import accord.local.Command;
import accord.local.KeyHistory;
import accord.primitives.Keys;
import accord.primitives.PartialTxn;
-import accord.primitives.RoutableKey;
import accord.primitives.TxnId;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
@@ -96,7 +95,7 @@ public class AsyncLoaderTest
AccordStateCache.Instance<TxnId, Command, AccordSafeCommand>
commandCache = commandStore.commandCache();
commandStore.executeBlocking(() -> commandStore.setCapacity(1024));
- AccordStateCache.Instance<RoutableKey, TimestampsForKey,
AccordSafeTimestampsForKey> timestampsCache =
commandStore.timestampsForKeyCache();
+ AccordStateCache.Instance<Key, TimestampsForKey,
AccordSafeTimestampsForKey> timestampsCache =
commandStore.timestampsForKeyCache();
TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
PartialTxn txn = createPartialTxn(0);
PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
@@ -375,7 +374,7 @@ public class AsyncLoaderTest
@Test
public void inProgressCFKSaveTest()
{
- inProgressCFKSaveTest(COMMANDS,
AccordCommandStore::commandsForKeyCache, context -> context.commandsForKey,
CommandsForKey::new, (cfk, u) -> cfk.update(null, u));
+ this.inProgressCFKSaveTest(COMMANDS,
AccordCommandStore::commandsForKeyCache, context -> context.commandsForKey,
CommandsForKey::new, (cfk, u) -> cfk.update(null, u));
}
@Test
@@ -384,7 +383,7 @@ public class AsyncLoaderTest
inProgressCFKSaveTest(TIMESTAMPS,
AccordCommandStore::timestampsForKeyCache, context -> context.timestampsForKey,
TimestampsForKey::new, (tfk, c) -> new TimestampsForKey(tfk.key(),
c.executeAt(), c.executeAt().hlc(), c.executeAt()));
}
- private <T1, T2 extends AccordSafeState<RoutableKey, T1>, C extends
AccordStateCache.Instance<RoutableKey, T1, T2>> void
inProgressCFKSaveTest(KeyHistory history, Function<AccordCommandStore, C>
getter, Function<Context, TreeMap<?, ?>> inContext, Function<Key, T1>
initialiser, BiFunction<T1, Command, T1> update)
+ private <T1, T2 extends AccordSafeState<Key, T1>, C extends
AccordStateCache.Instance<Key, T1, T2>> void inProgressCFKSaveTest(KeyHistory
history, Function<AccordCommandStore, C> getter, Function<Context, TreeMap<?,
?>> inContext, Function<Key, T1> initialiser, BiFunction<T1, Command, T1>
update)
{
AtomicLong clock = new AtomicLong(0);
ManualExecutor executor = new ManualExecutor();
diff --git
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
index b235d1ae59..a5ece26a51 100644
---
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
@@ -36,7 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.RoutingKey;
-import accord.impl.SafeCommandsForKey;
+import accord.local.SafeCommandsForKey;
import accord.local.CheckedCommands;
import accord.local.Command;
import accord.local.PreLoadContext;
@@ -53,7 +53,6 @@ import accord.primitives.PartialDeps;
import accord.primitives.PartialRoute;
import accord.primitives.PartialTxn;
import accord.primitives.Ranges;
-import accord.primitives.RoutableKey;
import accord.primitives.Route;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
@@ -259,7 +258,7 @@ public class AsyncOperationTest
Accept accept =
Accept.SerializerSupport.create(txnId, partialRoute,
txnId.epoch(), txnId.epoch(), false, Ballot.ZERO, executeAt, partialTxn.keys(),
deps);
Commit commit =
- Commit.SerializerSupport.create(txnId, partialRoute,
txnId.epoch(), Commit.Kind.Commit, Ballot.ZERO, executeAt, partialTxn.keys(),
partialTxn, deps, route, null);
+ Commit.SerializerSupport.create(txnId, partialRoute,
txnId.epoch(), Commit.Kind.CommitSlowPath, Ballot.ZERO, executeAt,
partialTxn.keys(), partialTxn, deps, route, null);
Commit stable =
Commit.SerializerSupport.create(txnId, partialRoute,
txnId.epoch(), Commit.Kind.StableSlowPath, Ballot.ZERO, executeAt,
partialTxn.keys(), partialTxn, deps, route, null);
@@ -482,8 +481,7 @@ public class AsyncOperationTest
}
try
{
- //TODO this is due to bad typing for Instance, it doesn't use ?
extends RoutableKey
- assertNoReferences(commandStore.commandsForKeyCache(),
(Iterable<RoutableKey>) (Iterable<?>) keys);
+ assertNoReferences(commandStore.commandsForKeyCache(), keys);
}
catch (AssertionError e)
{
@@ -524,8 +522,7 @@ public class AsyncOperationTest
private static void awaitDone(AccordCommandStore commandStore, List<TxnId>
ids, Keys keys)
{
awaitDone(commandStore.commandCache(), ids);
- //TODO this is due to bad typing for Instance, it doesn't use ?
extends RoutableKey
- awaitDone(commandStore.commandsForKeyCache(), (Iterable<RoutableKey>)
(Iterable<?>) keys);
+ awaitDone(commandStore.commandsForKeyCache(), keys);
}
private static <T> void awaitDone(AccordStateCache.Instance<T, ?, ?>
cache, Iterable<T> keys)
diff --git
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index e12b3fbf87..405f92dc59 100644
---
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -38,9 +38,10 @@ import org.junit.BeforeClass;
import org.junit.Test;
import accord.api.Key;
-import accord.impl.CommandsForKey;
-import accord.impl.CommandsForKey.InternalStatus;
+import accord.local.CommandsForKey;
+import accord.local.CommandsForKey.InternalStatus;
import accord.local.Command;
+import accord.local.CommandsForKey.TxnInfo;
import accord.local.CommonAttributes;
import accord.local.CommonAttributes.Mutable;
import accord.local.Listeners;
@@ -241,9 +242,12 @@ public class CommandsForKeySerializerTest
List<TxnId> deps = cmds[i].deps;
List<TxnId> missing = cmds[i].missing;
for (int j = 0 ; j < limit ; ++j)
- if (i != j) deps.add(cmds[j].txnId);
+ {
+ if (i != j && cmds[i].txnId.kind().witnesses(cmds[j].txnId))
+ deps.add(cmds[j].txnId);
+ }
- int missingCount = Math.min(limit - (limit > i ? 1 : 0),
missingCountSupplier.getAsInt());
+ int missingCount = Math.min(deps.size(),
missingCountSupplier.getAsInt());
while (missingCount > 0)
{
int remove = source.nextInt(deps.size());
@@ -267,14 +271,14 @@ public class CommandsForKeySerializerTest
{
InternalStatus status =
InternalStatus.from(cmds[j].saveStatus);
if (status == null || !status.hasInfo) continue;
- if (status.depsKnownBefore(cmds[j].txnId,
cmds[j].executeAt).compareTo(cmds[i].txnId) > 0 &&
Collections.binarySearch(cmds[j].missing, cmds[i].txnId) < 0)
+ if (cmds[j].txnId.kind().witnesses(cmds[i].txnId) &&
status.depsKnownBefore(cmds[j].txnId,
cmds[j].executeAt).compareTo(cmds[i].txnId) > 0 &&
Collections.binarySearch(cmds[j].missing, cmds[i].txnId) < 0)
continue outer;
}
for (int j = i + 1 ; j < cmds.length ; ++j)
{
InternalStatus status =
InternalStatus.from(cmds[j].saveStatus);
if (status == null || !status.hasInfo) continue;
- if (Collections.binarySearch(cmds[j].missing, cmds[i].txnId) <
0)
+ if (cmds[j].txnId.kind().witnesses(cmds[i].txnId) &&
Collections.binarySearch(cmds[j].missing, cmds[i].txnId) < 0)
continue outer;
}
cmds[i].invisible = true;
@@ -322,7 +326,7 @@ public class CommandsForKeySerializerTest
@Test
public void serde()
{
-// testOne(1821931462020409370L);
+ testOne(-6946067792202944553L);
Random random = new Random();
for (int i = 0 ; i < 10000 ; ++i)
{
@@ -426,13 +430,13 @@ public class CommandsForKeySerializerTest
Assert.assertTrue(cmd.invisible);
continue;
}
- CommandsForKey.Info info = cfk.info(i);
+ TxnInfo info = cfk.get(i);
InternalStatus expectStatus =
InternalStatus.from(cmd.saveStatus);
if (expectStatus == null) expectStatus =
InternalStatus.TRANSITIVELY_KNOWN;
if (expectStatus.hasInfo)
- Assert.assertEquals(cmd.executeAt,
info.executeAt(cfk.txnId(i)));
+ Assert.assertEquals(cmd.executeAt, info.executeAt);
Assert.assertEquals(expectStatus, info.status);
- Assert.assertArrayEquals(cmd.missing.toArray(TxnId[]::new),
info.missing);
+ Assert.assertArrayEquals(cmd.missing.toArray(TxnId[]::new),
info.missing());
++i;
}
@@ -461,11 +465,12 @@ public class CommandsForKeySerializerTest
next = txnIdGen.next(rs0);
return next;
}).unique().ofSizeBetween(0, 10).next(rs);
- CommandsForKey.Info[] info = new CommandsForKey.Info[ids.length];
+ TxnInfo[] info = new TxnInfo[ids.length];
for (int i = 0; i < info.length; i++)
- info[i] = rs.pick(InternalStatus.values()).asNoInfo;
- Arrays.sort(ids, Comparator.naturalOrder());
- CommandsForKey expected =
CommandsForKey.SerializerSupport.create(pk, redudentBefore, ids, info);
+ info[i] = TxnInfo.create(ids[i],
rs.pick(InternalStatus.values()), ids[i], CommandsForKey.NO_TXNIDS);
+ Arrays.sort(info, Comparator.naturalOrder());
+
+ CommandsForKey expected =
CommandsForKey.SerializerSupport.create(pk, info,
CommandsForKey.NO_PENDING_UNMANAGED);
ByteBuffer buffer =
CommandsForKeySerializer.toBytesWithoutKey(expected);
CommandsForKey roundTrip = CommandsForKeySerializer.fromBytes(pk,
buffer);
@@ -479,10 +484,10 @@ public class CommandsForKeySerializerTest
long tokenValue = -2311778975040348869L;
DecoratedKey key =
Murmur3Partitioner.instance.decorateKey(Murmur3Partitioner.LongToken.keyForToken(tokenValue));
PartitionKey pk = new
PartitionKey(TableId.fromString("1b255f4d-ef25-40a6-0000-000000000009"), key);
+ TxnId txnId = TxnId.fromValues(11,34052499,2,1);
CommandsForKey expected = CommandsForKey.SerializerSupport.create(pk,
- TxnId.fromValues(0,0,0,0),
- new TxnId[]
{TxnId.fromValues(11,34052499,2,1)},
- new CommandsForKey.Info[]
{ InternalStatus.PREACCEPTED.asNoInfo});
+ new TxnInfo[] {
TxnInfo.create(txnId, InternalStatus.PREACCEPTED, txnId,
CommandsForKey.NO_TXNIDS) },
+
CommandsForKey.NO_PENDING_UNMANAGED);
ByteBuffer buffer =
CommandsForKeySerializer.toBytesWithoutKey(expected);
CommandsForKey roundTrip = CommandsForKeySerializer.fromBytes(pk,
buffer);
diff --git
a/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
b/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
index 5d5fc1bf56..3df7d87e0d 100644
---
a/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
@@ -23,6 +23,8 @@ import org.junit.Test;
import accord.local.Command;
import accord.primitives.Deps;
+import accord.primitives.Routable;
+import accord.primitives.TxnId;
import accord.utils.Gen;
import accord.utils.Gens;
import accord.utils.SimpleBitSet;
@@ -52,19 +54,20 @@ public class WaitingOnSerializerTest
{
DataOutputBuffer buffer = new DataOutputBuffer();
qt().forAll(waitingOnGen()).check(waitingOn -> {
+ TxnId txnId = TxnId.NONE;
+ if (waitingOn.appliedOrInvalidated != null) txnId = new
TxnId(txnId.epoch(), txnId.hlc(), txnId.kind(), Routable.Domain.Range,
txnId.node);
buffer.clear();
long expectedSize = WaitingOnSerializer.serializedSize(waitingOn);
- WaitingOnSerializer.serialize(waitingOn, buffer);
+ WaitingOnSerializer.serialize(txnId, waitingOn, buffer);
Assertions.assertThat(buffer.getLength()).isEqualTo(expectedSize);
- Command.WaitingOn read =
WaitingOnSerializer.deserialize(waitingOn.deps, new
DataInputBuffer(buffer.unsafeGetBufferAndFlip(), false));
+ Command.WaitingOn read = WaitingOnSerializer.deserialize(txnId,
waitingOn.keys, waitingOn.txnIds, new
DataInputBuffer(buffer.unsafeGetBufferAndFlip(), false));
Assertions.assertThat(read)
.isEqualTo(waitingOn)
-
.isEqualTo(WaitingOnSerializer.deserialize(waitingOn.deps,
WaitingOnSerializer.serialize(waitingOn)));
+ .isEqualTo(WaitingOnSerializer.deserialize(txnId,
waitingOn.keys, waitingOn.txnIds, WaitingOnSerializer.serialize(txnId,
waitingOn)));
});
}
- private enum WaitingOnSets
- {COMMIT, APPLY, APPLYED_OR_INVALIDATED}
+ private enum WaitingOnSets { APPLY, APPLIED_OR_INVALIDATED }
private static Gen<Command.WaitingOn> waitingOnGen()
{
@@ -75,22 +78,20 @@ public class WaitingOnSerializerTest
return rs -> {
Deps deps = depsGen.next(rs);
if (deps.isEmpty()) return Command.WaitingOn.EMPTY;
- int[] selected = Gens.arrays(Gens.ints().between(0,
deps.txnIdCount() - 1)).unique().ofSizeBetween(0, deps.txnIdCount() -
1).next(rs);
- SimpleBitSet waitingOnCommit = new SimpleBitSet(deps.txnIdCount(),
false);
- SimpleBitSet waitingOnApply = new SimpleBitSet(deps.txnIdCount(),
false);
- SimpleBitSet appliedOrInvalidated = new
SimpleBitSet(deps.txnIdCount(), false);
+ int txnIdCount = deps.rangeDeps.txnIdCount();
+ int keyCount = deps.keyDeps.keys().size();
+ int[] selected = Gens.arrays(Gens.ints().between(0, txnIdCount +
keyCount - 1)).unique().ofSizeBetween(0, txnIdCount + keyCount).next(rs);
+ SimpleBitSet waitingOn = new SimpleBitSet(txnIdCount + keyCount,
false);
+ SimpleBitSet appliedOrInvalidated = rs.nextBoolean() ? null : new
SimpleBitSet(txnIdCount, false);
for (int i : selected)
{
- WaitingOnSets set = sets.next(rs);
+ WaitingOnSets set = appliedOrInvalidated == null || i >=
txnIdCount ? WaitingOnSets.APPLY : sets.next(rs);
switch (set)
{
- case COMMIT:
- waitingOnCommit.set(i);
- break;
case APPLY:
- waitingOnApply.set(i);
+ waitingOn.set(i);
break;
- case APPLYED_OR_INVALIDATED:
+ case APPLIED_OR_INVALIDATED:
appliedOrInvalidated.set(i);
break;
default:
@@ -98,7 +99,7 @@ public class WaitingOnSerializerTest
}
}
- return new Command.WaitingOn(deps,
Utils.ensureImmutable(waitingOnCommit), Utils.ensureImmutable(waitingOnApply),
Utils.ensureImmutable(appliedOrInvalidated));
+ return new Command.WaitingOn(deps.keyDeps.keys(),
deps.rangeDeps.txnIds(), Utils.ensureImmutable(waitingOn),
Utils.ensureImmutable(appliedOrInvalidated));
};
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]