This is an automated email from the ASF dual-hosted git repository.
jonmeredith pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
new 602ffcbf3e fix flaky
o.a.c.distributed.test.PaxosRepair2Test.paxosRepairHistoryIsntUpdatedInForcedRepair
602ffcbf3e is described below
commit 602ffcbf3e4ead4732fdf46d506165f63d80a9a4
Author: Jon Meredith <[email protected]>
AuthorDate: Wed May 3 10:27:48 2023 -0600
fix flaky
o.a.c.distributed.test.PaxosRepair2Test.paxosRepairHistoryIsntUpdatedInForcedRepair
patch by Jon Meredith; reviewed by Blake Eggleston for CASSANDRA-18047
---
.../distributed/test/PaxosRepair2Test.java | 250 ++++++++++-----------
1 file changed, 124 insertions(+), 126 deletions(-)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
index 8371bd4c4e..574b84f2eb 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
@@ -18,19 +18,16 @@
package org.apache.cassandra.distributed.test;
-import java.net.InetAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -68,7 +65,6 @@ import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.exceptions.CasWriteTimeoutException;
import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.RepairParallelism;
@@ -106,6 +102,7 @@ public class PaxosRepair2Test extends TestBaseImpl
{
private static final Logger logger =
LoggerFactory.getLogger(PaxosRepair2Test.class);
private static final String TABLE = "tbl";
+ public static final String OFFSETTABLE_CLOCK_NAME =
OffsettableClock.class.getName();
static
{
@@ -125,19 +122,6 @@ public class PaxosRepair2Test extends TestBaseImpl
return uncommitted;
}
- private static void assertAllAlive(Cluster cluster)
- {
- Set<InetAddress> allEndpoints = cluster.stream().map(i ->
i.broadcastAddress().getAddress()).collect(Collectors.toSet());
- cluster.stream().forEach(instance -> {
- instance.runOnInstance(() -> {
- ImmutableSet<InetAddressAndPort> endpoints =
Gossiper.instance.getEndpoints();
- Assert.assertEquals(allEndpoints, endpoints);
- for (InetAddressAndPort endpoint : endpoints)
-
Assert.assertTrue(FailureDetector.instance.isAlive(endpoint));
- });
- });
- }
-
private static void assertUncommitted(IInvokableInstance instance, String
ks, String table, int expected)
{
Assert.assertEquals(expected, getUncommitted(instance, ks, table));
@@ -255,19 +239,18 @@ public class PaxosRepair2Test extends TestBaseImpl
)
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + "
(k int primary key, v int)");
- cluster.get(3).shutdown();
+ cluster.get(3).shutdown().get();
InetAddressAndPort node3 =
InetAddressAndPort.getByAddress(cluster.get(3).broadcastAddress());
- for (int i = 0; i < 10; i++)
- {
- if (!cluster.get(1).callOnInstance(() ->
FailureDetector.instance.isAlive(node3)))
- break;
- }
+ // make sure node1 knows node3 is down
+ Awaitility.waitAtMost(1,TimeUnit.MINUTES).until(
+ () -> !cluster.get(1).callOnInstance(() ->
FailureDetector.instance.isAlive(node3)));
repair(cluster, KEYSPACE, TABLE, true);
for (int i = 0; i < cluster.size() - 1; i++)
{
cluster.get(i + 1).runOnInstance(() -> {
+
Assert.assertFalse(CassandraRelevantProperties.CLOCK_GLOBAL.isPresent());
ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
DecoratedKey key =
cfs.decorateKey(ByteBufferUtil.bytes(1));
Assert.assertTrue(FBUtilities.getBroadcastAddressAndPort().toString(),
Commit.isAfter(staleBallot, cfs.getPaxosRepairLowBound(key)));
@@ -355,8 +338,13 @@ public class PaxosRepair2Test extends TestBaseImpl
.set("truncate_request_timeout_in_ms", 1000L)))
)
{
+ cluster.forEach(i -> {
+
Assert.assertFalse(CassandraRelevantProperties.CLOCK_GLOBAL.isPresent());
+ Assert.assertEquals("1",
System.getProperty("cassandra.auto_repair_frequency_seconds"));
+ Assert.assertEquals("true",
System.getProperty("cassandra.disable_paxos_auto_repairs"));
+ });
cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + "
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
- cluster.get(3).shutdown();
+ cluster.get(3).shutdown().get();
cluster.verbs(Verb.PAXOS_COMMIT_REQ).drop();
try
{
@@ -412,6 +400,7 @@ public class PaxosRepair2Test extends TestBaseImpl
repair(cluster, KEYSPACE, TABLE);
cluster.forEach(i -> i.runOnInstance(() -> {
+
Assert.assertFalse(CassandraRelevantProperties.CLOCK_GLOBAL.isPresent());
compactPaxos();
Map<Integer, PaxosRow> rows = getPaxosRows();
Assert.assertEquals(Sets.newHashSet(1), rows.keySet());
@@ -470,116 +459,125 @@ public class PaxosRepair2Test extends TestBaseImpl
@Test
public void legacyPurgeRepairLoop() throws Exception
{
-
CassandraRelevantProperties.CLOCK_GLOBAL.setString(OffsettableClock.class.getName());
- try (Cluster cluster = init(Cluster.create(3, cfg -> cfg
-
.set("paxos_variant", "v2")
-
.set("paxos_state_purging", "legacy")
-
.set("paxos_purge_grace_period", "0s")
-
.set("truncate_request_timeout_in_ms", 1000L)))
- )
+ try
{
- int ttl = 3*3600;
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + "
(pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH gc_grace_seconds=" + ttl);
-
- // prepare an operation ttl + 1 hour into the past on a single node
- cluster.forEach(instance -> {
- instance.runOnInstance(() -> {
- backdateTimestamps(ttl + 3600);
+
CassandraRelevantProperties.CLOCK_GLOBAL.setString(OFFSETTABLE_CLOCK_NAME);
+ try (Cluster cluster = init(Cluster.create(3, cfg -> cfg
+
.set("paxos_variant", "v2")
+
.set("paxos_state_purging", "legacy")
+
.set("paxos_purge_grace_period", "0s")
+
.set("truncate_request_timeout_in_ms", 1000L)))
+ )
+ {
+ cluster.forEach(i ->
Assert.assertEquals(OFFSETTABLE_CLOCK_NAME,
CassandraRelevantProperties.CLOCK_GLOBAL.getString()));
+ int ttl = 3 * 3600;
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE
+ " (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH gc_grace_seconds=" +
ttl);
+
+ // prepare an operation ttl + 1 hour into the past on a single
node
+ cluster.forEach(instance -> {
+ instance.runOnInstance(() -> {
+ Assert.assertEquals(OFFSETTABLE_CLOCK_NAME,
CassandraRelevantProperties.CLOCK_GLOBAL.getString());
+ backdateTimestamps(ttl + 3600);
+ });
});
- });
- cluster.filters().inbound().to(1, 2).drop();
- assertTimeout(() -> cluster.coordinator(3).execute("INSERT INTO "
+ KEYSPACE + '.' + TABLE + " (pk, ck, v) VALUES (400, 2, 2) IF NOT EXISTS",
ConsistencyLevel.QUORUM));
- Ballot oldBallot =
Ballot.fromUuid(cluster.get(3).callOnInstance(() -> {
- TableMetadata cfm = Schema.instance.getTableMetadata(KEYSPACE,
TABLE);
- DecoratedKey dk =
cfm.partitioner.decorateKey(ByteBufferUtil.bytes(400));
- try (PaxosState state = PaxosState.get(dk, cfm))
- {
- return state.currentSnapshot().promised.asUUID();
- }
- }));
-
- assertUncommitted(cluster.get(1), KEYSPACE, TABLE, 0);
- assertUncommitted(cluster.get(2), KEYSPACE, TABLE, 0);
- assertUncommitted(cluster.get(3), KEYSPACE, TABLE, 1);
-
- // commit an operation just over ttl in the past on the other nodes
- cluster.filters().reset();
- cluster.filters().inbound().to(2).drop();
- cluster.forEach(instance -> {
- instance.runOnInstance(() -> {
- backdateTimestamps(ttl + 2);
+ cluster.filters().inbound().to(1, 2).drop();
+ assertTimeout(() -> cluster.coordinator(3).execute("INSERT
INTO " + KEYSPACE + '.' + TABLE + " (pk, ck, v) VALUES (400, 2, 2) IF NOT
EXISTS", ConsistencyLevel.QUORUM));
+ Ballot oldBallot =
Ballot.fromUuid(cluster.get(3).callOnInstance(() -> {
+ TableMetadata cfm =
Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+ DecoratedKey dk =
cfm.partitioner.decorateKey(ByteBufferUtil.bytes(400));
+ try (PaxosState state = PaxosState.get(dk, cfm))
+ {
+ return state.currentSnapshot().promised.asUUID();
+ }
+ }));
+
+ assertUncommitted(cluster.get(1), KEYSPACE, TABLE, 0);
+ assertUncommitted(cluster.get(2), KEYSPACE, TABLE, 0);
+ assertUncommitted(cluster.get(3), KEYSPACE, TABLE, 1);
+
+ // commit an operation just over ttl in the past on the other
nodes
+ cluster.filters().reset();
+ cluster.filters().inbound().to(2).drop();
+ cluster.forEach(instance -> {
+ instance.runOnInstance(() -> {
+ backdateTimestamps(ttl + 2);
+ });
+ });
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + '.'
+ TABLE + " (pk, ck, v) VALUES (400, 2, 2) IF NOT EXISTS",
ConsistencyLevel.QUORUM);
+
+ // expire the cache entries
+ int nowInSec = FBUtilities.nowInSeconds();
+ cluster.get(1).runOnInstance(() -> {
+ TableMetadata table =
Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+ DecoratedKey dk =
table.partitioner.decorateKey(ByteBufferUtil.bytes(400));
+ try (PaxosState state = PaxosState.get(dk, table))
+ {
+ state.updateStateUnsafe(s -> {
+ Assert.assertNull(s.accepted);
+
Assert.assertTrue(Commit.isAfter(s.committed.ballot, oldBallot));
+ Commit.CommittedWithTTL committed = new
Commit.CommittedWithTTL(s.committed.ballot,
+
s.committed.update,
+
ballotDeletion(s.committed));
+ Assert.assertTrue(committed.localDeletionTime <
nowInSec);
+ return new PaxosState.Snapshot(Ballot.none(),
Ballot.none(), null, committed);
+ });
+ }
});
- });
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + '.' +
TABLE + " (pk, ck, v) VALUES (400, 2, 2) IF NOT EXISTS",
ConsistencyLevel.QUORUM);
- // expire the cache entries
- int nowInSec = FBUtilities.nowInSeconds();
- cluster.get(1).runOnInstance(() -> {
- TableMetadata table =
Schema.instance.getTableMetadata(KEYSPACE, TABLE);
- DecoratedKey dk =
table.partitioner.decorateKey(ByteBufferUtil.bytes(400));
- try (PaxosState state = PaxosState.get(dk, table))
- {
- state.updateStateUnsafe(s -> {
- Assert.assertNull(s.accepted);
- Assert.assertTrue(Commit.isAfter(s.committed.ballot,
oldBallot));
- Commit.CommittedWithTTL committed = new
Commit.CommittedWithTTL(s.committed.ballot,
-
s.committed.update,
-
ballotDeletion(s.committed));
- Assert.assertTrue(committed.localDeletionTime <
nowInSec);
- return new PaxosState.Snapshot(Ballot.none(),
Ballot.none(), null, committed);
- });
- }
- });
+ cluster.get(3).runOnInstance(() -> {
+ TableMetadata table =
Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+ DecoratedKey dk =
table.partitioner.decorateKey(ByteBufferUtil.bytes(400));
+ try (PaxosState state = PaxosState.get(dk, table))
+ {
+ state.updateStateUnsafe(s -> {
+ Assert.assertNull(s.accepted);
+
Assert.assertTrue(Commit.isAfter(s.committed.ballot, oldBallot));
+ Commit.CommittedWithTTL committed = new
Commit.CommittedWithTTL(s.committed.ballot,
+
s.committed.update,
+
ballotDeletion(s.committed));
+ Assert.assertTrue(committed.localDeletionTime <
nowInSec);
+ return new PaxosState.Snapshot(oldBallot,
oldBallot, null, committed);
+ });
+ }
+ });
- cluster.get(3).runOnInstance(() -> {
- TableMetadata table =
Schema.instance.getTableMetadata(KEYSPACE, TABLE);
- DecoratedKey dk =
table.partitioner.decorateKey(ByteBufferUtil.bytes(400));
- try (PaxosState state = PaxosState.get(dk, table))
- {
- state.updateStateUnsafe(s -> {
- Assert.assertNull(s.accepted);
- Assert.assertTrue(Commit.isAfter(s.committed.ballot,
oldBallot));
- Commit.CommittedWithTTL committed = new
Commit.CommittedWithTTL(s.committed.ballot,
-
s.committed.update,
-
ballotDeletion(s.committed));
- Assert.assertTrue(committed.localDeletionTime <
nowInSec);
- return new PaxosState.Snapshot(oldBallot, oldBallot,
null, committed);
+ cluster.forEach(instance -> {
+ instance.runOnInstance(() -> {
+ backdateTimestamps(0);
});
- }
- });
-
- cluster.forEach(instance -> {
- instance.runOnInstance(() -> {
- backdateTimestamps(0);
});
- });
-
- cluster.filters().reset();
- cluster.filters().inbound().to(2).drop();
- cluster.get(3).runOnInstance(() -> {
-
- TableMetadata table =
Schema.instance.getTableMetadata(KEYSPACE, TABLE);
- DecoratedKey dk =
table.partitioner.decorateKey(ByteBufferUtil.bytes(400));
- UpdateSupplier supplier =
PaxosState.uncommittedTracker().unsafGetUpdateSupplier();
- try
- {
- PaxosUncommittedTracker.unsafSetUpdateSupplier(new
SingleUpdateSupplier(table, dk, oldBallot));
- StorageService.instance.autoRepairPaxos(table.id).get();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- finally
- {
- PaxosUncommittedTracker.unsafSetUpdateSupplier(supplier);
- }
- });
+ cluster.filters().reset();
+ cluster.filters().inbound().to(2).drop();
+ cluster.get(3).runOnInstance(() -> {
+
+ TableMetadata table =
Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+ DecoratedKey dk =
table.partitioner.decorateKey(ByteBufferUtil.bytes(400));
+
+ UpdateSupplier supplier =
PaxosState.uncommittedTracker().unsafGetUpdateSupplier();
+ try
+ {
+ PaxosUncommittedTracker.unsafSetUpdateSupplier(new
SingleUpdateSupplier(table, dk, oldBallot));
+
StorageService.instance.autoRepairPaxos(table.id).get();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+
PaxosUncommittedTracker.unsafSetUpdateSupplier(supplier);
+ }
+ });
- assertUncommitted(cluster.get(1), KEYSPACE, TABLE, 0);
- assertUncommitted(cluster.get(2), KEYSPACE, TABLE, 0);
- assertUncommitted(cluster.get(3), KEYSPACE, TABLE, 0);
+ assertUncommitted(cluster.get(1), KEYSPACE, TABLE, 0);
+ assertUncommitted(cluster.get(2), KEYSPACE, TABLE, 0);
+ assertUncommitted(cluster.get(3), KEYSPACE, TABLE, 0);
+ }
+ }
+ finally
+ {
+ CassandraRelevantProperties.CLOCK_GLOBAL.reset();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]