This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
     new 602ffcbf3e fix flaky 
o.a.c.distributed.test.PaxosRepair2Test.paxosRepairHistoryIsntUpdatedInForcedRepair
602ffcbf3e is described below

commit 602ffcbf3e4ead4732fdf46d506165f63d80a9a4
Author: Jon Meredith <[email protected]>
AuthorDate: Wed May 3 10:27:48 2023 -0600

    fix flaky 
o.a.c.distributed.test.PaxosRepair2Test.paxosRepairHistoryIsntUpdatedInForcedRepair
    
    patch by Jon Meredith; reviewed by Blake Eggleston for CASSANDRA-18047
---
 .../distributed/test/PaxosRepair2Test.java         | 250 ++++++++++-----------
 1 file changed, 124 insertions(+), 126 deletions(-)

diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java 
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
index 8371bd4c4e..574b84f2eb 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
@@ -18,19 +18,16 @@
 
 package org.apache.cassandra.distributed.test;
 
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -68,7 +65,6 @@ import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.exceptions.CasWriteTimeoutException;
 import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.repair.RepairParallelism;
@@ -106,6 +102,7 @@ public class PaxosRepair2Test extends TestBaseImpl
 {
     private static final Logger logger = 
LoggerFactory.getLogger(PaxosRepair2Test.class);
     private static final String TABLE = "tbl";
+    public static final String OFFSETTABLE_CLOCK_NAME = 
OffsettableClock.class.getName();
 
     static
     {
@@ -125,19 +122,6 @@ public class PaxosRepair2Test extends TestBaseImpl
         return uncommitted;
     }
 
-    private static void assertAllAlive(Cluster cluster)
-    {
-        Set<InetAddress> allEndpoints = cluster.stream().map(i -> 
i.broadcastAddress().getAddress()).collect(Collectors.toSet());
-        cluster.stream().forEach(instance -> {
-            instance.runOnInstance(() -> {
-                ImmutableSet<InetAddressAndPort> endpoints = 
Gossiper.instance.getEndpoints();
-                Assert.assertEquals(allEndpoints, endpoints);
-                for (InetAddressAndPort endpoint : endpoints)
-                    
Assert.assertTrue(FailureDetector.instance.isAlive(endpoint));
-            });
-        });
-    }
-
     private static void assertUncommitted(IInvokableInstance instance, String 
ks, String table, int expected)
     {
         Assert.assertEquals(expected, getUncommitted(instance, ks, table));
@@ -255,19 +239,18 @@ public class PaxosRepair2Test extends TestBaseImpl
         )
         {
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + " 
(k int primary key, v int)");
-            cluster.get(3).shutdown();
+            cluster.get(3).shutdown().get();
             InetAddressAndPort node3 = 
InetAddressAndPort.getByAddress(cluster.get(3).broadcastAddress());
 
-            for (int i = 0; i < 10; i++)
-            {
-                if (!cluster.get(1).callOnInstance(() -> 
FailureDetector.instance.isAlive(node3)))
-                    break;
-            }
+            // make sure node1 knows node3 is down
+            Awaitility.waitAtMost(1,TimeUnit.MINUTES).until(
+            () -> !cluster.get(1).callOnInstance(() -> 
FailureDetector.instance.isAlive(node3)));
 
             repair(cluster, KEYSPACE, TABLE, true);
             for (int i = 0; i < cluster.size() - 1; i++)
             {
                 cluster.get(i + 1).runOnInstance(() -> {
+                    
Assert.assertFalse(CassandraRelevantProperties.CLOCK_GLOBAL.isPresent());
                     ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
                     DecoratedKey key = 
cfs.decorateKey(ByteBufferUtil.bytes(1));
                     
Assert.assertTrue(FBUtilities.getBroadcastAddressAndPort().toString(), 
Commit.isAfter(staleBallot, cfs.getPaxosRepairLowBound(key)));
@@ -355,8 +338,13 @@ public class PaxosRepair2Test extends TestBaseImpl
                                                              
.set("truncate_request_timeout_in_ms", 1000L)))
         )
         {
+            cluster.forEach(i -> {
+                
Assert.assertFalse(CassandraRelevantProperties.CLOCK_GLOBAL.isPresent());
+                Assert.assertEquals("1", 
System.getProperty("cassandra.auto_repair_frequency_seconds"));
+                Assert.assertEquals("true", 
System.getProperty("cassandra.disable_paxos_auto_repairs"));
+            });
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + " 
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-            cluster.get(3).shutdown();
+            cluster.get(3).shutdown().get();
             cluster.verbs(Verb.PAXOS_COMMIT_REQ).drop();
             try
             {
@@ -412,6 +400,7 @@ public class PaxosRepair2Test extends TestBaseImpl
 
             repair(cluster, KEYSPACE, TABLE);
             cluster.forEach(i -> i.runOnInstance(() -> {
+                
Assert.assertFalse(CassandraRelevantProperties.CLOCK_GLOBAL.isPresent());
                 compactPaxos();
                 Map<Integer, PaxosRow> rows = getPaxosRows();
                 Assert.assertEquals(Sets.newHashSet(1), rows.keySet());
@@ -470,116 +459,125 @@ public class PaxosRepair2Test extends TestBaseImpl
     @Test
     public void legacyPurgeRepairLoop() throws Exception
     {
-        
CassandraRelevantProperties.CLOCK_GLOBAL.setString(OffsettableClock.class.getName());
-        try (Cluster cluster = init(Cluster.create(3, cfg -> cfg
-                                                             
.set("paxos_variant", "v2")
-                                                             
.set("paxos_state_purging", "legacy")
-                                                             
.set("paxos_purge_grace_period", "0s")
-                                                             
.set("truncate_request_timeout_in_ms", 1000L)))
-        )
+        try
         {
-            int ttl = 3*3600;
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + " 
(pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH gc_grace_seconds=" + ttl);
-
-            // prepare an operation ttl + 1 hour into the past on a single node
-            cluster.forEach(instance -> {
-                instance.runOnInstance(() -> {
-                    backdateTimestamps(ttl + 3600);
+            
CassandraRelevantProperties.CLOCK_GLOBAL.setString(OFFSETTABLE_CLOCK_NAME);
+            try (Cluster cluster = init(Cluster.create(3, cfg -> cfg
+                                                                 
.set("paxos_variant", "v2")
+                                                                 
.set("paxos_state_purging", "legacy")
+                                                                 
.set("paxos_purge_grace_period", "0s")
+                                                                 
.set("truncate_request_timeout_in_ms", 1000L)))
+            )
+            {
+                cluster.forEach(i -> 
Assert.assertEquals(OFFSETTABLE_CLOCK_NAME, 
CassandraRelevantProperties.CLOCK_GLOBAL.getString()));
+                int ttl = 3 * 3600;
+                cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE 
+ " (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH gc_grace_seconds=" + 
ttl);
+
+                // prepare an operation ttl + 1 hour into the past on a single 
node
+                cluster.forEach(instance -> {
+                    instance.runOnInstance(() -> {
+                        Assert.assertEquals(OFFSETTABLE_CLOCK_NAME, 
CassandraRelevantProperties.CLOCK_GLOBAL.getString());
+                        backdateTimestamps(ttl + 3600);
+                    });
                 });
-            });
-            cluster.filters().inbound().to(1, 2).drop();
-            assertTimeout(() -> cluster.coordinator(3).execute("INSERT INTO " 
+ KEYSPACE + '.' + TABLE + " (pk, ck, v) VALUES (400, 2, 2) IF NOT EXISTS", 
ConsistencyLevel.QUORUM));
-            Ballot oldBallot = 
Ballot.fromUuid(cluster.get(3).callOnInstance(() -> {
-                TableMetadata cfm = Schema.instance.getTableMetadata(KEYSPACE, 
TABLE);
-                DecoratedKey dk = 
cfm.partitioner.decorateKey(ByteBufferUtil.bytes(400));
-                try (PaxosState state = PaxosState.get(dk, cfm))
-                {
-                    return state.currentSnapshot().promised.asUUID();
-                }
-            }));
-
-            assertUncommitted(cluster.get(1), KEYSPACE, TABLE, 0);
-            assertUncommitted(cluster.get(2), KEYSPACE, TABLE, 0);
-            assertUncommitted(cluster.get(3), KEYSPACE, TABLE, 1);
-
-            // commit an operation just over ttl in the past on the other nodes
-            cluster.filters().reset();
-            cluster.filters().inbound().to(2).drop();
-            cluster.forEach(instance -> {
-                instance.runOnInstance(() -> {
-                    backdateTimestamps(ttl + 2);
+                cluster.filters().inbound().to(1, 2).drop();
+                assertTimeout(() -> cluster.coordinator(3).execute("INSERT 
INTO " + KEYSPACE + '.' + TABLE + " (pk, ck, v) VALUES (400, 2, 2) IF NOT 
EXISTS", ConsistencyLevel.QUORUM));
+                Ballot oldBallot = 
Ballot.fromUuid(cluster.get(3).callOnInstance(() -> {
+                    TableMetadata cfm = 
Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+                    DecoratedKey dk = 
cfm.partitioner.decorateKey(ByteBufferUtil.bytes(400));
+                    try (PaxosState state = PaxosState.get(dk, cfm))
+                    {
+                        return state.currentSnapshot().promised.asUUID();
+                    }
+                }));
+
+                assertUncommitted(cluster.get(1), KEYSPACE, TABLE, 0);
+                assertUncommitted(cluster.get(2), KEYSPACE, TABLE, 0);
+                assertUncommitted(cluster.get(3), KEYSPACE, TABLE, 1);
+
+                // commit an operation just over ttl in the past on the other 
nodes
+                cluster.filters().reset();
+                cluster.filters().inbound().to(2).drop();
+                cluster.forEach(instance -> {
+                    instance.runOnInstance(() -> {
+                        backdateTimestamps(ttl + 2);
+                    });
+                });
+                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + '.' 
+ TABLE + " (pk, ck, v) VALUES (400, 2, 2) IF NOT EXISTS", 
ConsistencyLevel.QUORUM);
+
+                // expire the cache entries
+                int nowInSec = FBUtilities.nowInSeconds();
+                cluster.get(1).runOnInstance(() -> {
+                    TableMetadata table = 
Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+                    DecoratedKey dk = 
table.partitioner.decorateKey(ByteBufferUtil.bytes(400));
+                    try (PaxosState state = PaxosState.get(dk, table))
+                    {
+                        state.updateStateUnsafe(s -> {
+                            Assert.assertNull(s.accepted);
+                            
Assert.assertTrue(Commit.isAfter(s.committed.ballot, oldBallot));
+                            Commit.CommittedWithTTL committed = new 
Commit.CommittedWithTTL(s.committed.ballot,
+                                                                               
             s.committed.update,
+                                                                               
             ballotDeletion(s.committed));
+                            Assert.assertTrue(committed.localDeletionTime < 
nowInSec);
+                            return new PaxosState.Snapshot(Ballot.none(), 
Ballot.none(), null, committed);
+                        });
+                    }
                 });
-            });
-            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + '.' + 
TABLE + " (pk, ck, v) VALUES (400, 2, 2) IF NOT EXISTS", 
ConsistencyLevel.QUORUM);
 
-            // expire the cache entries
-            int nowInSec = FBUtilities.nowInSeconds();
-            cluster.get(1).runOnInstance(() -> {
-                TableMetadata table = 
Schema.instance.getTableMetadata(KEYSPACE, TABLE);
-                DecoratedKey dk = 
table.partitioner.decorateKey(ByteBufferUtil.bytes(400));
-                try (PaxosState state = PaxosState.get(dk, table))
-                {
-                    state.updateStateUnsafe(s -> {
-                        Assert.assertNull(s.accepted);
-                        Assert.assertTrue(Commit.isAfter(s.committed.ballot, 
oldBallot));
-                        Commit.CommittedWithTTL committed = new 
Commit.CommittedWithTTL(s.committed.ballot,
-                                                                               
         s.committed.update,
-                                                                               
         ballotDeletion(s.committed));
-                        Assert.assertTrue(committed.localDeletionTime < 
nowInSec);
-                        return new PaxosState.Snapshot(Ballot.none(), 
Ballot.none(), null, committed);
-                    });
-                }
-            });
+                cluster.get(3).runOnInstance(() -> {
+                    TableMetadata table = 
Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+                    DecoratedKey dk = 
table.partitioner.decorateKey(ByteBufferUtil.bytes(400));
+                    try (PaxosState state = PaxosState.get(dk, table))
+                    {
+                        state.updateStateUnsafe(s -> {
+                            Assert.assertNull(s.accepted);
+                            
Assert.assertTrue(Commit.isAfter(s.committed.ballot, oldBallot));
+                            Commit.CommittedWithTTL committed = new 
Commit.CommittedWithTTL(s.committed.ballot,
+                                                                               
             s.committed.update,
+                                                                               
             ballotDeletion(s.committed));
+                            Assert.assertTrue(committed.localDeletionTime < 
nowInSec);
+                            return new PaxosState.Snapshot(oldBallot, 
oldBallot, null, committed);
+                        });
+                    }
+                });
 
-            cluster.get(3).runOnInstance(() -> {
-                TableMetadata table = 
Schema.instance.getTableMetadata(KEYSPACE, TABLE);
-                DecoratedKey dk = 
table.partitioner.decorateKey(ByteBufferUtil.bytes(400));
-                try (PaxosState state = PaxosState.get(dk, table))
-                {
-                    state.updateStateUnsafe(s -> {
-                        Assert.assertNull(s.accepted);
-                        Assert.assertTrue(Commit.isAfter(s.committed.ballot, 
oldBallot));
-                        Commit.CommittedWithTTL committed = new 
Commit.CommittedWithTTL(s.committed.ballot,
-                                                                               
         s.committed.update,
-                                                                               
         ballotDeletion(s.committed));
-                        Assert.assertTrue(committed.localDeletionTime < 
nowInSec);
-                        return new PaxosState.Snapshot(oldBallot, oldBallot, 
null, committed);
+                cluster.forEach(instance -> {
+                    instance.runOnInstance(() -> {
+                        backdateTimestamps(0);
                     });
-                }
-            });
-
-            cluster.forEach(instance -> {
-                instance.runOnInstance(() -> {
-                    backdateTimestamps(0);
                 });
-            });
-
-            cluster.filters().reset();
-            cluster.filters().inbound().to(2).drop();
-            cluster.get(3).runOnInstance(() -> {
-
-                TableMetadata table = 
Schema.instance.getTableMetadata(KEYSPACE, TABLE);
-                DecoratedKey dk = 
table.partitioner.decorateKey(ByteBufferUtil.bytes(400));
 
-                UpdateSupplier supplier = 
PaxosState.uncommittedTracker().unsafGetUpdateSupplier();
-                try
-                {
-                    PaxosUncommittedTracker.unsafSetUpdateSupplier(new 
SingleUpdateSupplier(table, dk, oldBallot));
-                    StorageService.instance.autoRepairPaxos(table.id).get();
-                }
-                catch (Exception e)
-                {
-                    throw new RuntimeException(e);
-                }
-                finally
-                {
-                    PaxosUncommittedTracker.unsafSetUpdateSupplier(supplier);
-                }
-            });
+                cluster.filters().reset();
+                cluster.filters().inbound().to(2).drop();
+                cluster.get(3).runOnInstance(() -> {
+
+                    TableMetadata table = 
Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+                    DecoratedKey dk = 
table.partitioner.decorateKey(ByteBufferUtil.bytes(400));
+
+                    UpdateSupplier supplier = 
PaxosState.uncommittedTracker().unsafGetUpdateSupplier();
+                    try
+                    {
+                        PaxosUncommittedTracker.unsafSetUpdateSupplier(new 
SingleUpdateSupplier(table, dk, oldBallot));
+                        
StorageService.instance.autoRepairPaxos(table.id).get();
+                    }
+                    catch (Exception e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                    finally
+                    {
+                        
PaxosUncommittedTracker.unsafSetUpdateSupplier(supplier);
+                    }
+                });
 
-            assertUncommitted(cluster.get(1), KEYSPACE, TABLE, 0);
-            assertUncommitted(cluster.get(2), KEYSPACE, TABLE, 0);
-            assertUncommitted(cluster.get(3), KEYSPACE, TABLE, 0);
+                assertUncommitted(cluster.get(1), KEYSPACE, TABLE, 0);
+                assertUncommitted(cluster.get(2), KEYSPACE, TABLE, 0);
+                assertUncommitted(cluster.get(3), KEYSPACE, TABLE, 0);
+            }
+        }
+        finally
+        {
+            CassandraRelevantProperties.CLOCK_GLOBAL.reset();
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to