This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new df492dfd - add shareable APPLIED and INVALIDATED implementations of
Result - API changes to support splicing in complete update fragments from
PartialTxn as mutations are finally being applied
df492dfd is described below
commit df492dfd2ffe993c33761d0531ac5b979b80f080
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Thu Mar 23 14:47:54 2023 -0500
- add shareable APPLIED and INVALIDATED implementations of Result
- API changes to support splicing in complete update fragments from
PartialTxn as mutations are finally being applied
patch by Caleb Rackliffe; reviewed by David Capwell, Benedict Elliot Smith,
and Ariel Weisberg for CASSANDRA-18355
---
accord-core/src/main/java/accord/api/Result.java | 10 ++++++++--
accord-core/src/main/java/accord/api/Write.java | 3 ++-
accord-core/src/main/java/accord/local/Command.java | 2 +-
accord-core/src/main/java/accord/local/Commands.java | 2 +-
accord-core/src/main/java/accord/primitives/Writes.java | 4 ++--
accord-core/src/test/java/accord/impl/list/ListWrite.java | 3 ++-
accord-core/src/test/java/accord/impl/mock/MockStore.java | 2 +-
accord-core/src/test/java/accord/messages/ReadDataTest.java | 2 +-
.../src/main/java/accord/maelstrom/MaelstromWrite.java | 3 ++-
9 files changed, 20 insertions(+), 11 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/Result.java
b/accord-core/src/main/java/accord/api/Result.java
index d8b8fd6d..0509c476 100644
--- a/accord-core/src/main/java/accord/api/Result.java
+++ b/accord-core/src/main/java/accord/api/Result.java
@@ -23,11 +23,17 @@ import accord.primitives.ProgressToken;
/**
* A result to be returned to a client, or be stored in a node's command state.
- *
- * TODO (expected, efficiency): support minimizing the result for storage in a
node's command state (e.g. to only retain success/failure)
*/
public interface Result extends Outcome
{
+ Result APPLIED = new Result() { };
+
+ Result INVALIDATED = new Result()
+ {
+ @Override
+ public ProgressToken asProgressToken() { return
ProgressToken.INVALIDATED; }
+ };
+
@Override
default ProgressToken asProgressToken() { return ProgressToken.APPLIED; }
}
diff --git a/accord-core/src/main/java/accord/api/Write.java
b/accord-core/src/main/java/accord/api/Write.java
index ebe25903..62635379 100644
--- a/accord-core/src/main/java/accord/api/Write.java
+++ b/accord-core/src/main/java/accord/api/Write.java
@@ -19,6 +19,7 @@
package accord.api;
import accord.local.SafeCommandStore;
+import accord.primitives.PartialTxn;
import accord.primitives.Seekable;
import accord.primitives.Timestamp;
import accord.utils.async.AsyncChain;
@@ -30,5 +31,5 @@ import accord.utils.async.AsyncChain;
*/
public interface Write
{
- AsyncChain<Void> apply(Seekable key, SafeCommandStore safeStore, Timestamp
executeAt, DataStore store);
+ AsyncChain<Void> apply(Seekable key, SafeCommandStore safeStore, Timestamp
executeAt, DataStore store, PartialTxn txn);
}
diff --git a/accord-core/src/main/java/accord/local/Command.java
b/accord-core/src/main/java/accord/local/Command.java
index 47f9caba..194fe2da 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -650,7 +650,7 @@ public abstract class Command implements CommonAttributes
public static Truncated invalidated(TxnId txnId, Listeners.Immutable
durableListeners)
{
- return new Truncated(txnId, SaveStatus.Invalidated,
DurableOrInvalidated, null, Timestamp.NONE, durableListeners, null, null);
+ return new Truncated(txnId, SaveStatus.Invalidated,
DurableOrInvalidated, null, Timestamp.NONE, durableListeners, null,
Result.INVALIDATED);
}
@Override
diff --git a/accord-core/src/main/java/accord/local/Commands.java
b/accord-core/src/main/java/accord/local/Commands.java
index 38d3f18b..52c37a5f 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -582,7 +582,7 @@ public class Commands
// that was pre-bootstrap for some range (so redundant and we may
have gone ahead of), but had to be executed locally
// for another range
CommandStore unsafeStore = safeStore.commandStore();
- return command.writes().apply(safeStore, applyRanges(safeStore,
command.executeAt()))
+ return command.writes().apply(safeStore, applyRanges(safeStore,
command.executeAt()), command.partialTxn())
.flatMap(unused -> unsafeStore.submit(context, ss -> {
postApply(ss, txnId);
return null;
diff --git a/accord-core/src/main/java/accord/primitives/Writes.java
b/accord-core/src/main/java/accord/primitives/Writes.java
index 6cc08f8f..fb908abe 100644
--- a/accord-core/src/main/java/accord/primitives/Writes.java
+++ b/accord-core/src/main/java/accord/primitives/Writes.java
@@ -63,7 +63,7 @@ public class Writes
return Objects.hash(executeAt, keys, write);
}
- public AsyncChain<Void> apply(SafeCommandStore safeStore, Ranges ranges)
+ public AsyncChain<Void> apply(SafeCommandStore safeStore, Ranges ranges,
PartialTxn txn)
{
if (write == null)
return SUCCESS;
@@ -72,7 +72,7 @@ public class Writes
return SUCCESS;
List<AsyncChain<Void>> futures = Routables.foldl(keys, ranges, (key,
accumulate, index) -> {
- accumulate.add(write.apply(key, safeStore, executeAt,
safeStore.dataStore()));
+ accumulate.add(write.apply(key, safeStore, executeAt,
safeStore.dataStore(), txn));
return accumulate;
}, new ArrayList<>());
return AsyncChains.reduce(futures, (l, r) -> null);
diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java
b/accord-core/src/test/java/accord/impl/list/ListWrite.java
index 44e0a963..e206aec5 100644
--- a/accord-core/src/test/java/accord/impl/list/ListWrite.java
+++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java
@@ -23,6 +23,7 @@ import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
+import accord.primitives.PartialTxn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +51,7 @@ public class ListWrite extends TreeMap<Key, int[]> implements
Write
}
@Override
- public AsyncChain<Void> apply(Seekable key, SafeCommandStore commandStore,
Timestamp executeAt, DataStore store)
+ public AsyncChain<Void> apply(Seekable key, SafeCommandStore commandStore,
Timestamp executeAt, DataStore store, PartialTxn txn)
{
ListStore s = (ListStore) store;
if (!containsKey(key))
diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java
b/accord-core/src/test/java/accord/impl/mock/MockStore.java
index db8b0219..8fa17d7e 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockStore.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java
@@ -44,7 +44,7 @@ public class MockStore implements DataStore
public static final Result RESULT = new Result() {};
public static final Query QUERY = (txnId, executeAt, data, read, update)
-> RESULT;
- public static final Write WRITE = (key, commandStore, executeAt, store) ->
Writes.SUCCESS;
+ public static final Write WRITE = (key, commandStore, executeAt, store,
command) -> Writes.SUCCESS;
public static Read read(Seekables<?, ?> keys)
{
diff --git a/accord-core/src/test/java/accord/messages/ReadDataTest.java
b/accord-core/src/test/java/accord/messages/ReadDataTest.java
index fdb40fc6..27c3e9bf 100644
--- a/accord-core/src/test/java/accord/messages/ReadDataTest.java
+++ b/accord-core/src/test/java/accord/messages/ReadDataTest.java
@@ -286,7 +286,7 @@ class ReadDataTest
{
AsyncResults.SettableResult<Void> writeResult = new
AsyncResults.SettableResult<>();
Write write = Mockito.mock(Write.class);
- Mockito.when(write.apply(any(), any(), any(),
any())).thenReturn(writeResult);
+ Mockito.when(write.apply(any(), any(), any(), any(),
any())).thenReturn(writeResult);
Writes writes = new Writes(txnId, executeAt, keys, write);
forEach(store ->
check(store.execute(PreLoadContext.contextFor(txnId, keys), safe -> {
diff --git
a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
index 55a14f21..eeeeb926 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
@@ -22,6 +22,7 @@ import accord.api.Key;
import accord.api.DataStore;
import accord.api.Write;
import accord.local.SafeCommandStore;
+import accord.primitives.PartialTxn;
import accord.primitives.Seekable;
import accord.primitives.Timestamp;
import accord.primitives.Writes;
@@ -33,7 +34,7 @@ import java.util.TreeMap;
public class MaelstromWrite extends TreeMap<Key, Value> implements Write
{
@Override
- public AsyncChain<Void> apply(Seekable key, SafeCommandStore commandStore,
Timestamp executeAt, DataStore store)
+ public AsyncChain<Void> apply(Seekable key, SafeCommandStore commandStore,
Timestamp executeAt, DataStore store, PartialTxn txn)
{
MaelstromStore s = (MaelstromStore) store;
if (containsKey(key))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]