This is an automated email from the ASF dual-hosted git repository.
stefania pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new b4e640a Fix flaky CoordinatorMessagingTest and docstring in
OutboundSink and ConsistentSession
b4e640a is described below
commit b4e640a96e76f8d4a45937b1312b64ddc1aeb8ac
Author: Aleksandr Sorokoumov <[email protected]>
AuthorDate: Tue Mar 31 15:53:51 2020 +0200
Fix flaky CoordinatorMessagingTest and docstring in OutboundSink and
ConsistentSession
patch by Aleksandr Sorokoumov; reviewed by Stefania Alborghetti for
CASSANDRA-15672
---
CHANGES.txt | 1 +
.../org/apache/cassandra/net/OutboundSink.java | 2 +-
.../repair/consistent/ConsistentSession.java | 8 +--
.../consistent/CoordinatorMessagingTest.java | 70 +++++++++++++++-------
4 files changed, 53 insertions(+), 28 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 65111d0..fb881de 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha4
+ * Fix flaky CoordinatorMessagingTest and docstring in OutboundSink and
ConsistentSession (CASSANDRA-15672)
* Fix force compaction of wrapping ranges (CASSANDRA-15664)
* Expose repair streaming metrics (CASSANDRA-15656)
* Set now in seconds in the future for validation repairs (CASSANDRA-15655)
diff --git a/src/java/org/apache/cassandra/net/OutboundSink.java
b/src/java/org/apache/cassandra/net/OutboundSink.java
index d19b3e2..34c72db 100644
--- a/src/java/org/apache/cassandra/net/OutboundSink.java
+++ b/src/java/org/apache/cassandra/net/OutboundSink.java
@@ -25,7 +25,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
/**
* A message sink that all outbound messages go through.
*
- * Default sink {@link Sink} used by {@link MessagingService} is
MessagingService#doSend(), which proceeds to
+ * Default sink {@link Sink} used by {@link MessagingService} is {@link
MessagingService#doSend(Message, InetAddressAndPort, ConnectionType)}, which
proceeds to
* send messages over the network, but it can be overridden to filter out
certain messages, record the fact
* of attempted delivery, or delay they delivery.
*
diff --git
a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
index 03de157..d9ac927 100644
--- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
@@ -56,13 +56,13 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
* There are 4 stages to a consistent incremental repair.
*
* <h1>Repair prepare</h1>
- * First, the normal {@link ActiveRepairService#prepareForRepair(UUID,
InetAddressAndPort, Set, RepairOption, List)} stuff
+ * First, the normal {@link ActiveRepairService#prepareForRepair(UUID,
InetAddressAndPort, Set, RepairOption, boolean, List)} stuff
* happens, which sends out {@link PrepareMessage} and creates a {@link
ActiveRepairService.ParentRepairSession}
* on the coordinator and each of the neighbors.
*
* <h1>Consistent prepare</h1>
* The consistent prepare step promotes the parent repair session to a
consistent session, and isolates the sstables
- * being repaired other sstables. First, the coordinator sends a {@link
PrepareConsistentRequest} message to each repair
+ * being repaired from other sstables. First, the coordinator sends a {@link
PrepareConsistentRequest} message to each repair
* participant (including itself). When received, the node creates a {@link
LocalSession} instance, sets it's state to
* {@code PREPARING}, persists it, and begins a preparing the tables for
incremental repair, which segregates the data
* being repaired from the rest of the table data. When the preparation
completes, the session state is set to
@@ -74,7 +74,7 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
* Once the coordinator recieves positive {@code PrepareConsistentResponse}
messages from all the participants, the
* coordinator begins the normal repair process.
* <p/>
- * (see {@link CoordinatorSession#handlePrepareResponse(InetAddress, boolean)}
+ * (see {@link CoordinatorSession#handlePrepareResponse(InetAddressAndPort,
boolean)}
*
* <h1>Repair</h1>
* The coordinator runs the normal data repair process against the sstables
segregated in the previous step. When a
@@ -96,7 +96,7 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
* conflicts with in progress compactions. The sstables will be marked
repaired as part of the normal compaction process.
* <p/>
*
- * On the coordinator side, see {@link CoordinatorSession#finalizePropose()},
{@link CoordinatorSession#handleFinalizePromise(InetAddress, boolean)},
+ * On the coordinator side, see {@link CoordinatorSession#finalizePropose()},
{@link CoordinatorSession#handleFinalizePromise(InetAddressAndPort, boolean)},
* & {@link CoordinatorSession#finalizeCommit()}
* <p/>
*
diff --git
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
index c9fd913..6f0d846 100644
---
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
+++
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -43,14 +44,11 @@ import
org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MockMessagingService;
import org.apache.cassandra.net.MockMessagingSpy;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.AbstractRepairTest;
import org.apache.cassandra.repair.RepairSessionResult;
-import org.apache.cassandra.repair.messages.FailSession;
-import org.apache.cassandra.repair.messages.FinalizeCommit;
import org.apache.cassandra.repair.messages.FinalizePromise;
import org.apache.cassandra.repair.messages.FinalizePropose;
import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
@@ -96,9 +94,11 @@ public class CoordinatorMessagingTest extends
AbstractRepairTest
@Test
public void testMockedMessagingHappyPath() throws InterruptedException,
ExecutionException, TimeoutException
{
+ CountDownLatch prepareLatch = createLatch();
+ CountDownLatch finalizeLatch = createLatch();
- MockMessagingSpy spyPrepare = createPrepareSpy(Collections.emptySet(),
Collections.emptySet());
- MockMessagingSpy spyFinalize =
createFinalizeSpy(Collections.emptySet(), Collections.emptySet());
+ MockMessagingSpy spyPrepare = createPrepareSpy(Collections.emptySet(),
Collections.emptySet(), prepareLatch);
+ MockMessagingSpy spyFinalize =
createFinalizeSpy(Collections.emptySet(), Collections.emptySet(),
finalizeLatch);
MockMessagingSpy spyCommit = createCommitSpy();
UUID uuid = registerSession(cfs, true, true);
@@ -120,6 +120,7 @@ public class CoordinatorMessagingTest extends
AbstractRepairTest
Assert.assertFalse(sessionResult.isDone());
Assert.assertFalse(hasFailures.get());
// prepare completed
+ prepareLatch.countDown();
spyPrepare.interceptMessageOut(3).get(1, TimeUnit.SECONDS);
Assert.assertFalse(sessionResult.isDone());
Assert.assertFalse(hasFailures.get());
@@ -128,9 +129,8 @@ public class CoordinatorMessagingTest extends
AbstractRepairTest
repairFuture.set(Lists.newArrayList(createResult(coordinator),
createResult(coordinator), createResult(coordinator)));
// finalize phase
+ finalizeLatch.countDown();
spyFinalize.interceptMessageOut(3).get(1, TimeUnit.SECONDS);
- Assert.assertFalse(sessionResult.isDone());
- Assert.assertFalse(hasFailures.get());
// commit phase
spyCommit.interceptMessageOut(3).get(1, TimeUnit.SECONDS);
@@ -150,39 +150,44 @@ public class CoordinatorMessagingTest extends
AbstractRepairTest
@Test
public void testMockedMessagingPrepareFailureP1() throws
InterruptedException, ExecutionException, TimeoutException
{
- createPrepareSpy(Collections.singleton(PARTICIPANT1),
Collections.emptySet());
- testMockedMessagingPrepareFailure();
+ CountDownLatch latch = createLatch();
+ createPrepareSpy(Collections.singleton(PARTICIPANT1),
Collections.emptySet(), latch);
+ testMockedMessagingPrepareFailure(latch);
}
@Test
public void testMockedMessagingPrepareFailureP12() throws
InterruptedException, ExecutionException, TimeoutException
{
- createPrepareSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2),
Collections.emptySet());
- testMockedMessagingPrepareFailure();
+ CountDownLatch latch = createLatch();
+ createPrepareSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2),
Collections.emptySet(), latch);
+ testMockedMessagingPrepareFailure(latch);
}
@Test
public void testMockedMessagingPrepareFailureP3() throws
InterruptedException, ExecutionException, TimeoutException
{
- createPrepareSpy(Collections.singleton(PARTICIPANT3),
Collections.emptySet());
- testMockedMessagingPrepareFailure();
+ CountDownLatch latch = createLatch();
+ createPrepareSpy(Collections.singleton(PARTICIPANT3),
Collections.emptySet(), latch);
+ testMockedMessagingPrepareFailure(latch);
}
@Test
public void testMockedMessagingPrepareFailureP123() throws
InterruptedException, ExecutionException, TimeoutException
{
- createPrepareSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2,
PARTICIPANT3), Collections.emptySet());
- testMockedMessagingPrepareFailure();
+ CountDownLatch latch = createLatch();
+ createPrepareSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2,
PARTICIPANT3), Collections.emptySet(), latch);
+ testMockedMessagingPrepareFailure(latch);
}
@Test(expected = TimeoutException.class)
public void testMockedMessagingPrepareFailureWrongSessionId() throws
InterruptedException, ExecutionException, TimeoutException
{
- createPrepareSpy(Collections.singleton(PARTICIPANT1),
Collections.emptySet(), (msgOut) -> UUID.randomUUID());
- testMockedMessagingPrepareFailure();
+ CountDownLatch latch = createLatch();
+ createPrepareSpy(Collections.singleton(PARTICIPANT1),
Collections.emptySet(), (msgOut) -> UUID.randomUUID(), latch);
+ testMockedMessagingPrepareFailure(latch);
}
- private void testMockedMessagingPrepareFailure() throws
InterruptedException, ExecutionException, TimeoutException
+ private void testMockedMessagingPrepareFailure(CountDownLatch
prepareLatch) throws InterruptedException, ExecutionException, TimeoutException
{
// we expect FailSession messages to all participants
MockMessagingSpy sendFailSessionExpectedSpy =
createFailSessionSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2,
PARTICIPANT3));
@@ -204,6 +209,7 @@ public class CoordinatorMessagingTest extends
AbstractRepairTest
// execute repair and start prepare phase
ListenableFuture<Boolean> sessionResult =
coordinator.execute(sessionSupplier, proposeFailed);
Assert.assertFalse(proposeFailed.get());
+ prepareLatch.countDown();
// prepare completed
try
{
@@ -223,7 +229,7 @@ public class CoordinatorMessagingTest extends
AbstractRepairTest
@Test
public void testMockedMessagingPrepareTimeout() throws
InterruptedException, ExecutionException, TimeoutException
{
- MockMessagingSpy spyPrepare = createPrepareSpy(Collections.emptySet(),
Collections.singleton(PARTICIPANT3));
+ MockMessagingSpy spyPrepare = createPrepareSpy(Collections.emptySet(),
Collections.singleton(PARTICIPANT3), new CountDownLatch(0));
MockMessagingSpy sendFailSessionUnexpectedSpy =
createFailSessionSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2,
PARTICIPANT3));
UUID uuid = registerSession(cfs, true, true);
@@ -265,17 +271,24 @@ public class CoordinatorMessagingTest extends
AbstractRepairTest
}
private MockMessagingSpy createPrepareSpy(Collection<InetAddressAndPort>
failed,
- Collection<InetAddressAndPort>
timeout)
+ Collection<InetAddressAndPort>
timeout,
+ CountDownLatch latch)
{
- return createPrepareSpy(failed, timeout, (msgOut) ->
msgOut.parentSession);
+ return createPrepareSpy(failed, timeout, (msgOut) ->
msgOut.parentSession, latch);
}
private MockMessagingSpy createPrepareSpy(Collection<InetAddressAndPort>
failed,
Collection<InetAddressAndPort>
timeout,
-
Function<PrepareConsistentRequest, UUID> sessionIdFunc)
+
Function<PrepareConsistentRequest, UUID> sessionIdFunc,
+ CountDownLatch latch)
{
return
MockMessagingService.when(verb(Verb.PREPARE_CONSISTENT_REQ)).respond((msgOut,
to) ->
{
+ try
+ {
+ latch.await();
+ }
+ catch (InterruptedException e) { }
if (timeout.contains(to))
return null;
@@ -285,10 +298,16 @@ public class CoordinatorMessagingTest extends
AbstractRepairTest
}
private MockMessagingSpy createFinalizeSpy(Collection<InetAddressAndPort>
failed,
- Collection<InetAddressAndPort>
timeout)
+ Collection<InetAddressAndPort>
timeout,
+ CountDownLatch latch)
{
return
MockMessagingService.when(verb(Verb.FINALIZE_PROPOSE_MSG)).respond((msgOut, to)
->
{
+ try
+ {
+ latch.await();
+ }
+ catch (InterruptedException e) { }
if (timeout.contains(to))
return null;
@@ -310,4 +329,9 @@ public class CoordinatorMessagingTest extends
AbstractRepairTest
{
return new RepairSessionResult(coordinator.sessionID, "ks",
coordinator.ranges, null, false);
}
+
+ private CountDownLatch createLatch()
+ {
+ return new CountDownLatch(1);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]