This is an automated email from the ASF dual-hosted git repository.

stefania pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b4e640a  Fix flaky CoordinatorMessagingTest and docstring in 
OutboundSink and ConsistentSession
b4e640a is described below

commit b4e640a96e76f8d4a45937b1312b64ddc1aeb8ac
Author: Aleksandr Sorokoumov <[email protected]>
AuthorDate: Tue Mar 31 15:53:51 2020 +0200

    Fix flaky CoordinatorMessagingTest and docstring in OutboundSink and 
ConsistentSession
    
    patch by Aleksandr Sorokoumov; reviewed by Stefania Alborghetti for 
CASSANDRA-15672
---
 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/net/OutboundSink.java     |  2 +-
 .../repair/consistent/ConsistentSession.java       |  8 +--
 .../consistent/CoordinatorMessagingTest.java       | 70 +++++++++++++++-------
 4 files changed, 53 insertions(+), 28 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 65111d0..fb881de 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha4
+ * Fix flaky CoordinatorMessagingTest and docstring in OutboundSink and 
ConsistentSession (CASSANDRA-15672)
  * Fix force compaction of wrapping ranges (CASSANDRA-15664)
  * Expose repair streaming metrics (CASSANDRA-15656)
  * Set now in seconds in the future for validation repairs (CASSANDRA-15655)
diff --git a/src/java/org/apache/cassandra/net/OutboundSink.java 
b/src/java/org/apache/cassandra/net/OutboundSink.java
index d19b3e2..34c72db 100644
--- a/src/java/org/apache/cassandra/net/OutboundSink.java
+++ b/src/java/org/apache/cassandra/net/OutboundSink.java
@@ -25,7 +25,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 /**
  * A message sink that all outbound messages go through.
  *
- * Default sink {@link Sink} used by {@link MessagingService} is 
MessagingService#doSend(), which proceeds to
+ * Default sink {@link Sink} used by {@link MessagingService} is {@link 
MessagingService#doSend(Message, InetAddressAndPort, ConnectionType)}, which 
proceeds to
  * send messages over the network, but it can be overridden to filter out 
certain messages, record the fact
  * of attempted delivery, or delay they delivery.
  *
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java 
b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
index 03de157..d9ac927 100644
--- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
@@ -56,13 +56,13 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
  * There are 4 stages to a consistent incremental repair.
  *
  * <h1>Repair prepare</h1>
- *  First, the normal {@link ActiveRepairService#prepareForRepair(UUID, 
InetAddressAndPort, Set, RepairOption, List)} stuff
+ *  First, the normal {@link ActiveRepairService#prepareForRepair(UUID, 
InetAddressAndPort, Set, RepairOption, boolean, List)} stuff
  *  happens, which sends out {@link PrepareMessage} and creates a {@link 
ActiveRepairService.ParentRepairSession}
  *  on the coordinator and each of the neighbors.
  *
  * <h1>Consistent prepare</h1>
  *  The consistent prepare step promotes the parent repair session to a 
consistent session, and isolates the sstables
- *  being repaired other sstables. First, the coordinator sends a {@link 
PrepareConsistentRequest} message to each repair
+ *  being repaired from  other sstables. First, the coordinator sends a {@link 
PrepareConsistentRequest} message to each repair
  *  participant (including itself). When received, the node creates a {@link 
LocalSession} instance, sets it's state to
  *  {@code PREPARING}, persists it, and begins a preparing the tables for 
incremental repair, which segregates the data
  *  being repaired from the rest of the table data. When the preparation 
completes, the session state is set to
@@ -74,7 +74,7 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
  *  Once the coordinator recieves positive {@code PrepareConsistentResponse} 
messages from all the participants, the
  *  coordinator begins the normal repair process.
  *  <p/>
- *  (see {@link CoordinatorSession#handlePrepareResponse(InetAddress, boolean)}
+ *  (see {@link CoordinatorSession#handlePrepareResponse(InetAddressAndPort, 
boolean)}
  *
  * <h1>Repair</h1>
  *  The coordinator runs the normal data repair process against the sstables 
segregated in the previous step. When a
@@ -96,7 +96,7 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
  *  conflicts with in progress compactions. The sstables will be marked 
repaired as part of the normal compaction process.
  *  <p/>
  *
- *  On the coordinator side, see {@link CoordinatorSession#finalizePropose()}, 
{@link CoordinatorSession#handleFinalizePromise(InetAddress, boolean)},
+ *  On the coordinator side, see {@link CoordinatorSession#finalizePropose()}, 
{@link CoordinatorSession#handleFinalizePromise(InetAddressAndPort, boolean)},
  *  & {@link CoordinatorSession#finalizeCommit()}
  *  <p/>
  *
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
index c9fd913..6f0d846 100644
--- 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
+++ 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -43,14 +44,11 @@ import 
org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.MockMessagingService;
 import org.apache.cassandra.net.MockMessagingSpy;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.repair.AbstractRepairTest;
 import org.apache.cassandra.repair.RepairSessionResult;
-import org.apache.cassandra.repair.messages.FailSession;
-import org.apache.cassandra.repair.messages.FinalizeCommit;
 import org.apache.cassandra.repair.messages.FinalizePromise;
 import org.apache.cassandra.repair.messages.FinalizePropose;
 import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
@@ -96,9 +94,11 @@ public class CoordinatorMessagingTest extends 
AbstractRepairTest
     @Test
     public void testMockedMessagingHappyPath() throws InterruptedException, 
ExecutionException, TimeoutException
     {
+        CountDownLatch prepareLatch = createLatch();
+        CountDownLatch finalizeLatch = createLatch();
 
-        MockMessagingSpy spyPrepare = createPrepareSpy(Collections.emptySet(), 
Collections.emptySet());
-        MockMessagingSpy spyFinalize = 
createFinalizeSpy(Collections.emptySet(), Collections.emptySet());
+        MockMessagingSpy spyPrepare = createPrepareSpy(Collections.emptySet(), 
Collections.emptySet(), prepareLatch);
+        MockMessagingSpy spyFinalize = 
createFinalizeSpy(Collections.emptySet(), Collections.emptySet(), 
finalizeLatch);
         MockMessagingSpy spyCommit = createCommitSpy();
 
         UUID uuid = registerSession(cfs, true, true);
@@ -120,6 +120,7 @@ public class CoordinatorMessagingTest extends 
AbstractRepairTest
         Assert.assertFalse(sessionResult.isDone());
         Assert.assertFalse(hasFailures.get());
         // prepare completed
+        prepareLatch.countDown();
         spyPrepare.interceptMessageOut(3).get(1, TimeUnit.SECONDS);
         Assert.assertFalse(sessionResult.isDone());
         Assert.assertFalse(hasFailures.get());
@@ -128,9 +129,8 @@ public class CoordinatorMessagingTest extends 
AbstractRepairTest
         repairFuture.set(Lists.newArrayList(createResult(coordinator), 
createResult(coordinator), createResult(coordinator)));
 
         // finalize phase
+        finalizeLatch.countDown();
         spyFinalize.interceptMessageOut(3).get(1, TimeUnit.SECONDS);
-        Assert.assertFalse(sessionResult.isDone());
-        Assert.assertFalse(hasFailures.get());
 
         // commit phase
         spyCommit.interceptMessageOut(3).get(1, TimeUnit.SECONDS);
@@ -150,39 +150,44 @@ public class CoordinatorMessagingTest extends 
AbstractRepairTest
     @Test
     public void testMockedMessagingPrepareFailureP1() throws 
InterruptedException, ExecutionException, TimeoutException
     {
-        createPrepareSpy(Collections.singleton(PARTICIPANT1), 
Collections.emptySet());
-        testMockedMessagingPrepareFailure();
+        CountDownLatch latch = createLatch();
+        createPrepareSpy(Collections.singleton(PARTICIPANT1), 
Collections.emptySet(), latch);
+        testMockedMessagingPrepareFailure(latch);
     }
 
     @Test
     public void testMockedMessagingPrepareFailureP12() throws 
InterruptedException, ExecutionException, TimeoutException
     {
-        createPrepareSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2), 
Collections.emptySet());
-        testMockedMessagingPrepareFailure();
+        CountDownLatch latch = createLatch();
+        createPrepareSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2), 
Collections.emptySet(), latch);
+        testMockedMessagingPrepareFailure(latch);
     }
 
     @Test
     public void testMockedMessagingPrepareFailureP3() throws 
InterruptedException, ExecutionException, TimeoutException
     {
-        createPrepareSpy(Collections.singleton(PARTICIPANT3), 
Collections.emptySet());
-        testMockedMessagingPrepareFailure();
+        CountDownLatch latch = createLatch();
+        createPrepareSpy(Collections.singleton(PARTICIPANT3), 
Collections.emptySet(), latch);
+        testMockedMessagingPrepareFailure(latch);
     }
 
     @Test
     public void testMockedMessagingPrepareFailureP123() throws 
InterruptedException, ExecutionException, TimeoutException
     {
-        createPrepareSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2, 
PARTICIPANT3), Collections.emptySet());
-        testMockedMessagingPrepareFailure();
+        CountDownLatch latch = createLatch();
+        createPrepareSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2, 
PARTICIPANT3), Collections.emptySet(), latch);
+        testMockedMessagingPrepareFailure(latch);
     }
 
     @Test(expected = TimeoutException.class)
     public void testMockedMessagingPrepareFailureWrongSessionId() throws 
InterruptedException, ExecutionException, TimeoutException
     {
-        createPrepareSpy(Collections.singleton(PARTICIPANT1), 
Collections.emptySet(), (msgOut) -> UUID.randomUUID());
-        testMockedMessagingPrepareFailure();
+        CountDownLatch latch = createLatch();
+        createPrepareSpy(Collections.singleton(PARTICIPANT1), 
Collections.emptySet(), (msgOut) -> UUID.randomUUID(), latch);
+        testMockedMessagingPrepareFailure(latch);
     }
 
-    private void testMockedMessagingPrepareFailure() throws 
InterruptedException, ExecutionException, TimeoutException
+    private void testMockedMessagingPrepareFailure(CountDownLatch 
prepareLatch) throws InterruptedException, ExecutionException, TimeoutException
     {
         // we expect FailSession messages to all participants
         MockMessagingSpy sendFailSessionExpectedSpy = 
createFailSessionSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2, 
PARTICIPANT3));
@@ -204,6 +209,7 @@ public class CoordinatorMessagingTest extends 
AbstractRepairTest
         // execute repair and start prepare phase
         ListenableFuture<Boolean> sessionResult = 
coordinator.execute(sessionSupplier, proposeFailed);
         Assert.assertFalse(proposeFailed.get());
+        prepareLatch.countDown();
         // prepare completed
         try
         {
@@ -223,7 +229,7 @@ public class CoordinatorMessagingTest extends 
AbstractRepairTest
     @Test
     public void testMockedMessagingPrepareTimeout() throws 
InterruptedException, ExecutionException, TimeoutException
     {
-        MockMessagingSpy spyPrepare = createPrepareSpy(Collections.emptySet(), 
Collections.singleton(PARTICIPANT3));
+        MockMessagingSpy spyPrepare = createPrepareSpy(Collections.emptySet(), 
Collections.singleton(PARTICIPANT3), new CountDownLatch(0));
         MockMessagingSpy sendFailSessionUnexpectedSpy = 
createFailSessionSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2, 
PARTICIPANT3));
 
         UUID uuid = registerSession(cfs, true, true);
@@ -265,17 +271,24 @@ public class CoordinatorMessagingTest extends 
AbstractRepairTest
     }
 
     private MockMessagingSpy createPrepareSpy(Collection<InetAddressAndPort> 
failed,
-                                              Collection<InetAddressAndPort> 
timeout)
+                                              Collection<InetAddressAndPort> 
timeout,
+                                              CountDownLatch latch)
     {
-        return createPrepareSpy(failed, timeout, (msgOut) -> 
msgOut.parentSession);
+        return createPrepareSpy(failed, timeout, (msgOut) -> 
msgOut.parentSession, latch);
     }
 
     private MockMessagingSpy createPrepareSpy(Collection<InetAddressAndPort> 
failed,
                                               Collection<InetAddressAndPort> 
timeout,
-                                              
Function<PrepareConsistentRequest, UUID> sessionIdFunc)
+                                              
Function<PrepareConsistentRequest, UUID> sessionIdFunc,
+                                              CountDownLatch latch)
     {
         return 
MockMessagingService.when(verb(Verb.PREPARE_CONSISTENT_REQ)).respond((msgOut, 
to) ->
         {
+            try
+            {
+                latch.await();
+            }
+            catch (InterruptedException e) { }
             if (timeout.contains(to))
                 return null;
 
@@ -285,10 +298,16 @@ public class CoordinatorMessagingTest extends 
AbstractRepairTest
     }
 
     private MockMessagingSpy createFinalizeSpy(Collection<InetAddressAndPort> 
failed,
-                                               Collection<InetAddressAndPort> 
timeout)
+                                               Collection<InetAddressAndPort> 
timeout,
+                                               CountDownLatch latch)
     {
         return 
MockMessagingService.when(verb(Verb.FINALIZE_PROPOSE_MSG)).respond((msgOut, to) 
->
         {
+            try
+            {
+                latch.await();
+            }
+            catch (InterruptedException e) { }
             if (timeout.contains(to))
                 return null;
 
@@ -310,4 +329,9 @@ public class CoordinatorMessagingTest extends 
AbstractRepairTest
     {
         return new RepairSessionResult(coordinator.sessionID, "ks", 
coordinator.ranges, null, false);
     }
+
+    private CountDownLatch createLatch()
+    {
+        return new CountDownLatch(1);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to