This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 8d8283b909 Refactor AccordTestBase to block retries on non-idempotent
transactions. Some tests may be flaky now due to Preempted being thrown.
8d8283b909 is described below
commit 8d8283b90935b1436aeb3245fa671e5b6a93be3e
Author: David Capwell <[email protected]>
AuthorDate: Mon Dec 19 16:07:20 2022 -0800
Refactor AccordTestBase to block retries on non-idempotent transactions.
Some tests may be flaky now due to Preempted being thrown.
---
.../cql3/statements/ModificationStatement.java | 12 ++++
.../cql3/statements/TransactionStatement.java | 5 ++
.../cql3/transactions/ReferenceOperation.java | 10 +++
.../distributed/test/accord/AccordCQLTest.java | 10 +--
.../distributed/test/accord/AccordTestBase.java | 71 ++++++++++++++++++++--
5 files changed, 97 insertions(+), 11 deletions(-)
diff --git
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 83f52a4a0b..89519df844 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -809,6 +809,18 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
return new TxnReferenceOperations(metadata, clustering, regularOps,
staticOps);
}
+ @VisibleForTesting
+ public void migrateReadRequiredOperations()
+ {
+ operations.migrateReadRequiredOperations();
+ }
+
+ @VisibleForTesting
+ public List<ReferenceOperation> getSubstitutions()
+ {
+ return operations.allSubstitutions();
+ }
+
public TxnWrite.Fragment getTxnWriteFragment(int index, ClientState state,
QueryOptions options)
{
// When an Operation requires a read, this cannot be done right away
and must be done by the transaction itself,
diff --git
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index baead627dc..4d3c732da0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -131,6 +131,11 @@ public class TransactionStatement implements CQLStatement
this.bindVariables = bindVariables;
}
+ public List<ModificationStatement> getUpdates()
+ {
+ return updates;
+ }
+
@Override
public List<ColumnSpecification> getBindVariables()
{
diff --git
a/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java
b/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java
index e89e616545..d4858eec8e 100644
--- a/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java
+++ b/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java
@@ -76,6 +76,16 @@ public class ReferenceOperation
return new ReferenceOperation(receiver, kind, key, field, value);
}
+ public TxnReferenceOperation.Kind getKind()
+ {
+ return kind;
+ }
+
+ public ReferenceValue getValue()
+ {
+ return value;
+ }
+
public ColumnMetadata getReceiver()
{
return receiver;
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
index 50318a00d1..1315bed14f 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
@@ -400,7 +400,7 @@ public class AccordCQLTest extends AccordTestBase
" SELECT row1.v;\n" +
" UPDATE " + currentTable + " SET v " +
operation + " 1 WHERE k = 1;\n" +
"COMMIT TRANSACTION";
- assertRowEqualsWithPreemptedRetry(cluster, new Object[] {
startingValue }, update);
+ assertRowEquals(cluster, new Object[] { startingValue },
update);
String check = "BEGIN TRANSACTION\n" +
" SELECT v FROM " + currentTable + " WHERE k
= 1;\n" +
@@ -1424,7 +1424,7 @@ public class AccordCQLTest extends AccordTestBase
" SELECT row0.counter,
row0.other_counter;\n" +
" UPDATE " + currentTable + " SET
other_counter += 1, counter += row0.counter WHERE k = 0 AND c = 1;\n" +
"COMMIT TRANSACTION";
- assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 1,
1 }, update);
+ assertRowEquals(cluster, new Object[] { 1, 1 }, update);
String check = "BEGIN TRANSACTION\n" +
" SELECT counter, other_counter FROM " +
currentTable + " WHERE k = 0 AND c = 1;\n" +
@@ -1448,7 +1448,7 @@ public class AccordCQLTest extends AccordTestBase
" UPDATE " + currentTable + " SET
int_list[0] = 42 WHERE k = 0 AND c = 0;\n" +
" UPDATE " + currentTable + " SET counter +=
1 WHERE k = 0 AND c = 0;\n" +
"COMMIT TRANSACTION";
- assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 0,
Arrays.asList(1, 2) }, update);
+ assertRowEquals(cluster, new Object[] { 0, Arrays.asList(1,
2) }, update);
String check = "BEGIN TRANSACTION\n" +
" SELECT counter, int_list FROM " +
currentTable + " WHERE k = 0 AND c = 0;\n" +
@@ -2255,7 +2255,7 @@ public class AccordCQLTest extends AccordTestBase
" UPDATE demo_ks.user_docs SET title='slides.key',
permissions=777 WHERE user='scott' AND doc_id=101;\n" +
" END IF\n" +
"COMMIT TRANSACTION";
- assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { 5 },
addDoc);
+ assertRowEquals(SHARED_CLUSTER, new Object[] { 5 }, addDoc);
String addUser = "BEGIN TRANSACTION\n" +
" LET demo_doc = (SELECT * FROM demo_ks.org_docs
WHERE org_name='demo' LIMIT 1);\n" +
@@ -2267,7 +2267,7 @@ public class AccordCQLTest extends AccordTestBase
" UPDATE demo_ks.user_docs SET title='slides.key',
permissions=777 WHERE user='benedict' AND doc_id=101;\n" +
" END IF\n" +
"COMMIT TRANSACTION";
- assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { 6 },
addUser);
+ assertRowEquals(SHARED_CLUSTER, new Object[] { 6 }, addUser);
}
// TODO: Implement support for basic arithmetic on references in INSERT
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index 2c9b2a46ae..38cca646fc 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -25,17 +25,20 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
+import accord.primitives.Txn;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import net.bytebuddy.implementation.bind.annotation.This;
+import org.apache.cassandra.cql3.transactions.ReferenceValue;
import org.apache.cassandra.distributed.api.QueryResults;
import org.apache.cassandra.distributed.util.QueryResultUtil;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import accord.coordinate.Preempted;
import org.apache.cassandra.cql3.statements.ModificationStatement;
@@ -46,8 +49,10 @@ import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.AccordTestUtils;
import org.apache.cassandra.utils.AssertionUtils;
import org.apache.cassandra.utils.FailingConsumer;
+import org.assertj.core.util.Arrays;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.junit.Assert.assertArrayEquals;
@@ -59,6 +64,7 @@ public abstract class AccordTestBase extends TestBaseImpl
protected static Cluster SHARED_CLUSTER;
protected String currentTable;
+ private final Logger logger = LoggerFactory.getLogger(getClass());
@BeforeClass
public static void setupClass() throws IOException
@@ -125,39 +131,92 @@ public abstract class AccordTestBase extends TestBaseImpl
.start());
}
+ private static SimpleQueryResult execute(Cluster cluster, String check,
Object... boundValues)
+ {
+ return cluster.coordinator(1).executeWithResult(check,
ConsistencyLevel.ANY, boundValues);
+ }
+
+ protected static SimpleQueryResult assertRowEquals(Cluster cluster,
SimpleQueryResult expected, String check, Object... boundValues)
+ {
+ SimpleQueryResult result = execute(cluster, check, boundValues);
+ QueryResultUtil.assertThat(result).isEqualTo(expected);
+ return result;
+ }
+
+ protected static SimpleQueryResult assertRowEquals(Cluster cluster,
Object[] row, String check, Object... boundValues)
+ {
+ return assertRowEquals(cluster,
QueryResults.builder().row(row).build(), check, boundValues);
+ }
+
// TODO: Retry on preemption may become unnecessary after the Unified Log
is integrated.
- protected static SimpleQueryResult
assertRowEqualsWithPreemptedRetry(Cluster cluster, Object[] row, String check,
Object... boundValues)
+ protected SimpleQueryResult assertRowEqualsWithPreemptedRetry(Cluster
cluster, Object[] row, String check, Object... boundValues)
{
return assertRowWithPreemptedRetry(cluster,
QueryResults.builder().row(row).build(), check, boundValues);
}
- protected static SimpleQueryResult assertEmptyWithPreemptedRetry(Cluster
cluster, String check, Object... boundValues)
+ protected SimpleQueryResult assertEmptyWithPreemptedRetry(Cluster cluster,
String check, Object... boundValues)
{
return assertRowWithPreemptedRetry(cluster,
QueryResults.builder().build(), check, boundValues);
}
- private static SimpleQueryResult assertRowWithPreemptedRetry(Cluster
cluster, SimpleQueryResult expected, String check, Object... boundValues)
+ private SimpleQueryResult assertRowWithPreemptedRetry(Cluster cluster,
SimpleQueryResult expected, String check, Object... boundValues)
{
SimpleQueryResult result = executeWithRetry(cluster, check,
boundValues);
QueryResultUtil.assertThat(result).isEqualTo(expected);
return result;
}
- protected static SimpleQueryResult executeWithRetry(Cluster cluster,
String check, Object... boundValues)
+ private SimpleQueryResult executeWithRetry0(int count, Cluster cluster,
String check, Object... boundValues)
{
try
{
- return cluster.coordinator(1).executeWithResult(check,
ConsistencyLevel.ANY, boundValues);
+ return execute(cluster, check, boundValues);
}
catch (Throwable t)
{
if (AssertionUtils.rootCauseIs(Preempted.class).matches(t))
- return executeWithRetry(cluster, check, boundValues);
+ {
+ logger.warn("[Retry attempt={}] Preempted failure for {}",
count, check);
+ return executeWithRetry0(count + 1, cluster, check,
boundValues);
+ }
throw t;
}
}
+ protected SimpleQueryResult executeWithRetry(Cluster cluster, String
check, Object... boundValues)
+ {
+ // is this method safe?
+ cluster.get(1).runOnInstance(() -> {
+ TransactionStatement stmt = AccordTestUtils.parse(check);
+ if (!isIdempotent(stmt))
+ throw new AssertionError("Unable to retry txn that is not
idempotent: cql=" + check);
+ });
+ return executeWithRetry0(0, cluster, check, boundValues);
+ }
+
+ public static boolean isIdempotent(TransactionStatement statement)
+ {
+ for (ModificationStatement update : statement.getUpdates())
+ {
+ if (!isIdempotent(update))
+ return false;
+ }
+ return true;
+ }
+
+ private static boolean isIdempotent(ModificationStatement update)
+ {
+ update.migrateReadRequiredOperations();
+ // ReferenceValue.Constant is used during migration, which means a
case like "a += 1"
+ // ReferenceValue.Substitution uses a LET reference, so rerunning
would always just see the new state
+ long numConstants = update.getSubstitutions().stream()
+ .filter(f -> f.getValue() instanceof
ReferenceValue.Constant)
+ .filter(f ->
!f.getKind().name().contains("Setter"))
+ .count();
+ return numConstants == 0;
+ }
+
public static class EnforceUpdateDoesNotPerformRead
{
public static void install(ClassLoader classLoader, Integer num)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]