This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 10671745a2 CEP-15 (Accord) Original and recover coordinators may hit a
race condition with PreApply where reads and writes are interleaved, causing
one of the coordinators to see the writes from the other
10671745a2 is described below
commit 10671745a254b0a7acf50310d7504896c9f2c584
Author: David Capwell <[email protected]>
AuthorDate: Fri Apr 7 15:39:42 2023 -0700
CEP-15 (Accord) Original and recover coordinators may hit a race condition
with PreApply where reads and writes are interleaved, causing one of the
coordinators to see the writes from the other
patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-18422
---
modules/accord | 2 +-
.../service/accord/AccordCommandStore.java | 8 +
.../simulator/paxos/HistoryValidatorTest.java | 225 +++++++++++++++------
.../service/accord/async/AsyncOperationTest.java | 13 +-
4 files changed, 181 insertions(+), 67 deletions(-)
diff --git a/modules/accord b/modules/accord
index bc81f81c75..08aaab6e33 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit bc81f81c75f93c73989a30bbc51b5c241a893c1a
+Subproject commit 08aaab6e33d43406e0649146144e4df67648602a
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 5c9f3e4e9d..70962298f4 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.service.accord;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -42,6 +43,7 @@ import accord.primitives.RoutableKey;
import accord.primitives.TxnId;
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
import org.apache.cassandra.service.accord.async.AsyncOperation;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
@@ -187,6 +189,12 @@ public class AccordCommandStore implements CommandStore
return AsyncOperation.create(this, loadCtx, function);
}
+ @Override
+ public <T> AsyncChain<T> submit(Callable<T> task)
+ {
+ return AsyncChains.ofCallable(executor, task);
+ }
+
public DataStore dataStore()
{
return dataStore;
diff --git
a/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java
b/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java
index 6c773fcca8..c9cff2891f 100644
---
a/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java
+++
b/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java
@@ -43,6 +43,7 @@ import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntIntHashMap;
import com.carrotsearch.hppc.IntIntMap;
import com.carrotsearch.hppc.IntSet;
+import com.carrotsearch.hppc.cursors.IntCursor;
import org.apache.cassandra.distributed.api.QueryResults;
import org.apache.cassandra.utils.Clock;
import org.assertj.core.api.AbstractThrowableAssert;
@@ -281,6 +282,32 @@ public class HistoryValidatorTest
);
}
+ private static String trim(String log, int... keys)
+ {
+ // this is deaad code, but exists to help when new validation errors
are detected
+ // the logic will shrink the history to only contain transactions that
contain the set of keys
+ IntSet set = new IntHashSet();
+ IntStream.of(keys).forEach(set::add);
+ Parsed parsed = parse(log);
+ StringBuilder sb = new StringBuilder();
+ for (Witness w : parsed.witnesses)
+ {
+ boolean match = false;
+ for (IntCursor pk : w.pks())
+ {
+ if (set.contains(pk.value))
+ {
+ match = true;
+ break;
+ }
+ }
+ if (!match) continue;
+ sb.append(w).append("\n");
+ }
+ return sb.toString();
+ }
+
+
private void requiresMultiKeySupport()
{
Assume.assumeTrue("Validator " + factory.getClass() + " does not
support multi-key", factory instanceof StrictSerializabilityValidator.Factory);
@@ -356,79 +383,146 @@ public class HistoryValidatorTest
return new Event(EnumSet.of(Event.Type.WRITE), pk, null);
}
- private void fromLog(String log)
+ private interface Operation
{
- IntSet pks = new IntHashSet();
- class Read
+ int pk();
+ void check(HistoryValidator.Checker check);
+ void appendString(StringBuilder sb);
+ }
+
+ private static class Read implements Operation
+ {
+ final int pk, id, count;
+ final int[] seq;
+
+ Read(int pk, int id, int count, int[] seq)
{
- final int pk, id, count;
- final int[] seq;
+ this.pk = pk;
+ this.id = id;
+ this.count = count;
+ this.seq = seq;
+ }
- Read(int pk, int id, int count, int[] seq)
- {
- this.pk = pk;
- this.id = id;
- this.count = count;
- this.seq = seq;
- }
+ @Override
+ public int pk()
+ {
+ return pk;
}
- class Write
+
+ @Override
+ public void check(HistoryValidator.Checker check)
{
- final int pk, id;
- final boolean success;
+ check.read(pk, id, count, seq);
+ }
- Write(int pk, int id, boolean success)
- {
- this.pk = pk;
- this.id = id;
- this.success = success;
- }
+ @Override
+ public void appendString(StringBuilder sb)
+ {
+ sb.append("read(pk=").append(pk).append(",
id=").append(id).append(", count=").append(count).append(",
seq=").append(Arrays.toString(seq)).append(")\n");
}
- class Witness
+ }
+
+ private static class Write implements Operation
+ {
+ final int pk, id;
+ final boolean success;
+
+ Write(int pk, int id, boolean success)
{
- final int start, end;
- final List<Object> actions = new ArrayList<>();
+ this.pk = pk;
+ this.id = id;
+ this.success = success;
+ }
- Witness(int start, int end)
- {
- this.start = start;
- this.end = end;
- }
+ @Override
+ public int pk()
+ {
+ return pk;
+ }
- void read(int pk, int id, int count, int[] seq)
- {
- actions.add(new Read(pk, id, count, seq));
- }
+ @Override
+ public void check(HistoryValidator.Checker check)
+ {
+ check.write(pk, id, success);
+ }
- void write(int pk, int id, boolean success)
- {
- actions.add(new Write(pk, id, success));
- }
+ @Override
+ public void appendString(StringBuilder sb)
+ {
+ sb.append("write(pk=").append(pk).append(",
id=").append(id).append(", success=").append(success).append(")\n");
+ }
+ }
+
+ private static class Witness
+ {
+ final int start, end;
+ final List<Operation> actions = new ArrayList<>();
+
+ Witness(int start, int end)
+ {
+ this.start = start;
+ this.end = end;
+ }
+
+ void read(int pk, int id, int count, int[] seq)
+ {
+ actions.add(new Read(pk, id, count, seq));
+ }
- void process(HistoryValidator validator)
+ void write(int pk, int id, boolean success)
+ {
+ actions.add(new Write(pk, id, success));
+ }
+
+ void process(HistoryValidator validator)
+ {
+ try (HistoryValidator.Checker check = validator.witness(start,
end))
{
- try (HistoryValidator.Checker check = validator.witness(start,
end))
- {
- for (Object a : actions)
- {
- if (a instanceof Read)
- {
- Read read = (Read) a;
- check.read(read.pk, read.id, read.count, read.seq);
- }
- else
- {
- Write write = (Write) a;
- check.write(write.pk, write.id, write.success);
- }
- }
- }
+ for (Operation a : actions)
+ a.check(check);
}
}
+
+ IntSet pks()
+ {
+ IntSet pks = new IntHashSet();
+ for (Operation action : actions)
+ pks.add(action.pk());
+ return pks;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Witness(start=").append(start).append(",
end=").append(end).append(")\n");
+ for (Operation a : actions)
+ a.appendString(sb.append('\t'));
+ return sb.toString();
+ }
+ }
+
+ private static class Parsed
+ {
+ private final int[] keys;
+ private final List<Witness> witnesses;
+
+ private Parsed(int[] keys, List<Witness> witnesses)
+ {
+ this.keys = keys;
+ this.witnesses = witnesses;
+ }
+ }
+
+ private static Parsed parse(String log)
+ {
+ IntSet pks = new IntHashSet();
List<Witness> witnesses = new ArrayList<>();
Witness current = null;
for (String line : log.split("\n"))
{
+ if (line.trim().isEmpty())
+ continue;
if (line.startsWith("Witness"))
{
if (current != null)
@@ -468,9 +562,26 @@ public class HistoryValidatorTest
witnesses.add(current);
int[] keys = pks.toArray();
Arrays.sort(keys);
- HistoryValidator validator = factory.create(keys);
- for (Witness w : witnesses)
- w.process(validator);
+ return new Parsed(keys, witnesses);
+ }
+
+ private void fromLog(String log)
+ {
+ Parsed parsed = parse(log);
+ HistoryValidator validator = factory.create(parsed.keys);
+ for (Witness w : parsed.witnesses)
+ {
+ try
+ {
+ w.process(validator);
+ }
+ catch (HistoryViolation e)
+ {
+ HistoryViolation hv = new HistoryViolation(e.primaryKey,
"Violation detected for witnessed action " + w + "; " + e.getMessage() + ";\n"
+ log);
+ hv.setStackTrace(e.getStackTrace());
+ throw hv;
+ }
+ }
}
private static class Event
diff --git
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
index b76793aaf7..638b9cbede 100644
---
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
@@ -42,8 +42,8 @@ import org.slf4j.LoggerFactory;
import accord.api.RoutingKey;
import accord.impl.SafeCommandsForKey;
+import accord.local.CheckedCommands;
import accord.local.Command;
-import accord.local.Commands;
import accord.local.PreLoadContext;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
@@ -194,14 +194,9 @@ public class AsyncOperationTest
try
{
return
getUninterruptibly(commandStore.submit(PreLoadContext.contextFor(Collections.singleton(txnId),
partialTxn.keys()), safe -> {
- Commands.AcceptOutcome result = Commands.preaccept(safe,
txnId, partialTxn, route, null);
- if (result != Commands.AcceptOutcome.Success) throw new
IllegalStateException("Command mutation rejected: " + result);
-
- result = Commands.accept(safe, txnId, Ballot.ZERO,
partialRoute, partialTxn.keys(), null, executeAt, deps);
- if (result != Commands.AcceptOutcome.Success) throw new
IllegalStateException("Command mutation rejected: " + result);
-
- Commands.CommitOutcome commit = Commands.commit(safe, txnId,
route, null, partialTxn, executeAt, deps);
- if (commit != Commands.CommitOutcome.Success) throw new
IllegalStateException("Command mutation rejected: " + result);
+ CheckedCommands.preaccept(safe, txnId, partialTxn, route,
null);
+ CheckedCommands.accept(safe, txnId, Ballot.ZERO, partialRoute,
partialTxn.keys(), null, executeAt, deps);
+ CheckedCommands.commit(safe, txnId, route, null, partialTxn,
executeAt, deps);
// clear cache
long cacheSize = commandStore.getCacheSize();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]