This is an automated email from the ASF dual-hosted git repository.
konstantinov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 52d3b23df2 junit fix: avoid creating of thousands of threads by
MatcherResponse when many messages are handled using MockMessagingSpy
52d3b23df2 is described below
commit 52d3b23df2084860dbf43d958cf2b069678ffdd1
Author: Dmitry Konstantinov <[email protected]>
AuthorDate: Wed Feb 18 11:22:24 2026 +0000
junit fix: avoid creating of thousands of threads by MatcherResponse when
many messages are handled using MockMessagingSpy
Too many threads can lead to OOM, so new Thread is replaced with an executor
spin wait is added to HintsServiceTest.testPageSeek to avoid flaky NPE
(HintsStore is populated in an async way)
patch by Dmitry Konstantinov; reviewed by Brandon Williams for
CASSANDRA-21166
---
.../cassandra/hints/HintServiceBytemanTest.java | 34 +++---
.../apache/cassandra/hints/HintsServiceTest.java | 118 ++++++++++++---------
.../org/apache/cassandra/net/MatcherResponse.java | 14 ++-
.../apache/cassandra/net/MockMessagingService.java | 13 ++-
.../cassandra/net/MockMessagingServiceTest.java | 30 +++---
.../org/apache/cassandra/net/MockMessagingSpy.java | 15 ++-
6 files changed, 136 insertions(+), 88 deletions(-)
diff --git a/test/unit/org/apache/cassandra/hints/HintServiceBytemanTest.java
b/test/unit/org/apache/cassandra/hints/HintServiceBytemanTest.java
index e296c0a583..72f29e445d 100644
--- a/test/unit/org/apache/cassandra/hints/HintServiceBytemanTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintServiceBytemanTest.java
@@ -104,21 +104,23 @@ public class HintServiceBytemanTest
public void testListPendingHints() throws InterruptedException,
ExecutionException, TimeoutException
{
HintsService.instance.resumeDispatch();
- MockMessagingSpy spy = sendHintsAndResponses(metadata, 20000, -1);
- Awaitility.await("For the hints file to flush")
-
.atMost(Duration.ofMillis(DatabaseDescriptor.getHintsFlushPeriodInMS() * 2L))
- .until(() ->
!HintsService.instance.getPendingHints().isEmpty());
-
- List<PendingHintsInfo> pendingHints =
HintsService.instance.getPendingHintsInfo();
- assertEquals(1, pendingHints.size());
- PendingHintsInfo info = pendingHints.get(0);
- assertEquals(StorageService.instance.getLocalHostUUID(), info.hostId);
- assertEquals(1, info.totalFiles);
- assertEquals(info.oldestTimestamp, info.newestTimestamp); // there is
1 descriptor with only 1 timestamp
-
- // JDK21 genZGC uncovered some flakiness / hanging here waiting on
Condition
- spy.interceptMessageOut(20000).get(60, TimeUnit.SECONDS);
- spy.printMessageCounts();
- assertEquals(Collections.emptyList(),
HintsService.instance.getPendingHints());
+ try(MockMessagingSpy spy = sendHintsAndResponses(metadata, 20000, -1))
+ {
+ Awaitility.await("For the hints file to flush")
+
.atMost(Duration.ofMillis(DatabaseDescriptor.getHintsFlushPeriodInMS() * 2L))
+ .until(() ->
!HintsService.instance.getPendingHints().isEmpty());
+
+ List<PendingHintsInfo> pendingHints =
HintsService.instance.getPendingHintsInfo();
+ assertEquals(1, pendingHints.size());
+ PendingHintsInfo info = pendingHints.get(0);
+ assertEquals(StorageService.instance.getLocalHostUUID(),
info.hostId);
+ assertEquals(1, info.totalFiles);
+ assertEquals(info.oldestTimestamp, info.newestTimestamp); // there
is 1 descriptor with only 1 timestamp
+
+ // JDK21 genZGC uncovered some flakiness / hanging here waiting on
Condition
+ spy.interceptMessageOut(20000).get(60, TimeUnit.SECONDS);
+ spy.printMessageCounts();
+ assertEquals(Collections.emptyList(),
HintsService.instance.getPendingHints());
+ }
}
}
diff --git a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
index 6249a5f3b7..ddfe4a2c8b 100644
--- a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
@@ -29,6 +30,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -67,6 +69,7 @@ import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.MockFailureDetector;
import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.cassandra.Util.spinAssertEquals;
import static
org.apache.cassandra.config.CassandraRelevantProperties.HINT_DISPATCH_INTERVAL_MS;
import static org.apache.cassandra.hints.HintsTestUtil.sendHintsAndResponses;
@@ -139,14 +142,15 @@ public class HintsServiceTest
long cnt = StorageMetrics.totalHints.getCount();
// create spy for hint messages
- MockMessagingSpy spy = sendHintsAndResponses(metadata, 100, -1);
-
- // metrics should have been updated with number of create hints
- assertEquals(cnt + 100, StorageMetrics.totalHints.getCount());
-
- // wait until hints have been send
- spy.interceptMessageOut(100).get();
- spy.interceptNoMsg(500, TimeUnit.MILLISECONDS).get();
+ try(MockMessagingSpy spy = sendHintsAndResponses(metadata, 100, -1))
+ {
+ // metrics should have been updated with number of create hints
+ assertEquals(cnt + 100, StorageMetrics.totalHints.getCount());
+
+ // wait until hints have been send
+ spy.interceptMessageOut(100).get();
+ spy.interceptNoMsg(500, TimeUnit.MILLISECONDS).get();
+ }
}
@Test
@@ -155,66 +159,78 @@ public class HintsServiceTest
HintsService.instance.pauseDispatch();
// create spy for hint messages
- MockMessagingSpy spy = sendHintsAndResponses(metadata, 100, -1);
-
- // we should not send any hints while paused
- ListenableFuture<Boolean> noMessagesWhilePaused =
spy.interceptNoMsg(15, TimeUnit.SECONDS);
- Futures.addCallback(noMessagesWhilePaused, new
MoreFutures.SuccessCallback<Boolean>()
+ try(MockMessagingSpy spy = sendHintsAndResponses(metadata, 100, -1))
{
- public void onSuccess(@Nullable Boolean aBoolean)
+
+ // we should not send any hints while paused
+ ListenableFuture<Boolean> noMessagesWhilePaused =
spy.interceptNoMsg(15, TimeUnit.SECONDS);
+ Futures.addCallback(noMessagesWhilePaused, new
MoreFutures.SuccessCallback<Boolean>()
{
- HintsService.instance.resumeDispatch();
- }
- }, MoreExecutors.directExecutor());
-
- Futures.allAsList(
- noMessagesWhilePaused,
- spy.interceptMessageOut(100),
- spy.interceptNoMsg(200, TimeUnit.MILLISECONDS)
- ).get();
+ public void onSuccess(@Nullable Boolean aBoolean)
+ {
+ HintsService.instance.resumeDispatch();
+ }
+ }, MoreExecutors.directExecutor());
+
+ Futures.allAsList(
+ noMessagesWhilePaused,
+ spy.interceptMessageOut(100),
+ spy.interceptNoMsg(200, TimeUnit.MILLISECONDS)
+ ).get();
+ }
}
@Test
public void testPageRetry() throws InterruptedException,
ExecutionException, TimeoutException
{
// create spy for hint messages, but only create responses for 5 hints
- MockMessagingSpy spy = sendHintsAndResponses(metadata, 20, 5);
+ try(MockMessagingSpy spy = sendHintsAndResponses(metadata, 20, 5))
+ {
- Futures.allAsList(
- // the dispatcher will always send all hints within the
current page
- // and only wait for the acks before going to the next page
- spy.interceptMessageOut(20),
- spy.interceptNoMsg(200, TimeUnit.MILLISECONDS),
+ Futures.allAsList(
+ // the dispatcher will always send all hints within the current
page
+ // and only wait for the acks before going to the next page
+ spy.interceptMessageOut(20),
+ spy.interceptNoMsg(200, TimeUnit.MILLISECONDS),
- // next tick will trigger a retry of the same page as we only
replied with 5/20 acks
- spy.interceptMessageOut(20)
- ).get();
+ // next tick will trigger a retry of the same page as we only
replied with 5/20 acks
+ spy.interceptMessageOut(20)
+ ).get();
- // marking the destination node as dead should stop sending hints
- failureDetector.isAlive = false;
- spy.interceptNoMsg(20, TimeUnit.SECONDS).get();
+ // marking the destination node as dead should stop sending hints
+ failureDetector.isAlive = false;
+ spy.interceptNoMsg(20, TimeUnit.SECONDS).get();
+ }
}
@Test
public void testPageSeek() throws InterruptedException, ExecutionException
{
// create spy for hint messages, stop replying after 12k (should be on
3rd page)
- MockMessagingSpy spy = sendHintsAndResponses(metadata, 20000, 12000);
-
- // At this point the dispatcher will constantly retry the page we
stopped acking,
- // thus we receive the same hints from the page multiple times and in
total more than
- // all written hints. Lets just consume them for a while and then
pause the dispatcher.
- spy.interceptMessageOut(22000).get();
- HintsService.instance.pauseDispatch();
- Thread.sleep(1000);
-
- // verify that we have a dispatch offset set for the page we're
currently stuck at
- HintsStore store =
HintsService.instance.getCatalog().get(StorageService.instance.getLocalHostUUID());
- HintsDescriptor descriptor = store.poll();
- store.offerFirst(descriptor); // add again for cleanup during
re-instanciation
- InputPosition dispatchOffset = store.getDispatchOffset(descriptor);
- assertTrue(dispatchOffset != null);
- assertTrue(((ChecksummedDataInput.Position)
dispatchOffset).sourcePosition > 0);
+ try(MockMessagingSpy spy = sendHintsAndResponses(metadata, 20000,
12000))
+ {
+ // At this point the dispatcher will constantly retry the page we
stopped acking,
+ // thus we receive the same hints from the page multiple times and
in total more than
+ // all written hints. Lets just consume them for a while and then
pause the dispatcher.
+ spy.interceptMessageOut(22000).get();
+ HintsService.instance.pauseDispatch();
+ Thread.sleep(1000);
+
+ // verify that we have a dispatch offset set for the page we're
currently stuck at
+ HintsStore store =
HintsService.instance.getCatalog().get(StorageService.instance.getLocalHostUUID());
+ AtomicReference<HintsDescriptor> hintDescriptorRef = new
AtomicReference<>();
+ Awaitility.waitAtMost(20, SECONDS).until(() -> {
+ HintsDescriptor descriptor = store.poll();
+ if (descriptor != null)
+ hintDescriptorRef.set(descriptor);
+ return descriptor != null;
+ });
+ HintsDescriptor descriptor = hintDescriptorRef.get();
+ store.offerFirst(descriptor); // add again for cleanup during
re-instanciation
+ InputPosition dispatchOffset = store.getDispatchOffset(descriptor);
+ assertTrue(dispatchOffset != null);
+ assertTrue(((ChecksummedDataInput.Position)
dispatchOffset).sourcePosition > 0);
+ }
}
/*
diff --git a/test/unit/org/apache/cassandra/net/MatcherResponse.java
b/test/unit/org/apache/cassandra/net/MatcherResponse.java
index cad03fcc71..228272b3c8 100644
--- a/test/unit/org/apache/cassandra/net/MatcherResponse.java
+++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.net;
+import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -37,7 +38,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
* The actual behavior by any instance of this class can be inspected by
* interacting with the returned {@link MockMessagingSpy}.
*/
-public class MatcherResponse
+public class MatcherResponse implements Closeable
{
private final Matcher<?> matcher;
private final Multimap<Long, InetAddressAndPort> sendResponses =
@@ -182,7 +183,7 @@ public class MatcherResponse
}
// create response asynchronously to match
request/response communication execution behavior
- new Thread(() ->
+ spy.responseExecutor.execute(() ->
{
Message<?> response = fnResponse.apply(message, to);
if (response != null)
@@ -202,7 +203,7 @@ public class MatcherResponse
spy.matchingResponse(response);
}
- }).start();
+ });
return false;
}
@@ -238,4 +239,11 @@ public class MatcherResponse
{
MessagingService.instance().outboundSink.remove(sink);
}
+
+ @Override
+ public void close()
+ {
+ destroy();
+ spy.close();
+ }
}
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingService.java
b/test/unit/org/apache/cassandra/net/MockMessagingService.java
index 72e9a296b2..bbeadd9297 100644
--- a/test/unit/org/apache/cassandra/net/MockMessagingService.java
+++ b/test/unit/org/apache/cassandra/net/MockMessagingService.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.net;
import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -37,12 +39,16 @@ public class MockMessagingService
{
}
+ private static final List<MatcherResponse> matcherResponses = new
CopyOnWriteArrayList<>();
+
/**
* Creates a MatcherResponse based on specified matcher.
*/
public static MatcherResponse when(Matcher matcher)
{
- return new MatcherResponse(matcher);
+ MatcherResponse matcherResponse = new MatcherResponse(matcher);
+ matcherResponses.add(matcherResponse);
+ return matcherResponse;
}
/**
@@ -53,6 +59,11 @@ public class MockMessagingService
{
MessagingService.instance().outboundSink.clear();
MessagingService.instance().inboundSink.clear();
+ for (MatcherResponse matcher : matcherResponses)
+ {
+ matcher.close();
+ }
+ matcherResponses.clear();
}
/**
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
index 0e6f3d2b16..4d29c9256a 100644
--- a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
@@ -56,28 +56,30 @@ public class MockMessagingServiceTest
{
// echo message that we like to mock as incoming response for outgoing
echo message
Message<NoPayload> echoMessage = Message.out(ECHO_REQ,
NoPayload.noPayload);
- MockMessagingSpy spy = MockMessagingService
+ try(MockMessagingSpy spy = MockMessagingService
.when(
all(
to(FBUtilities.getBroadcastAddressAndPort()),
verb(ECHO_REQ)
)
)
- .respond(echoMessage);
-
- Message<NoPayload> echoMessageOut = Message.out(ECHO_REQ,
NoPayload.noPayload);
- MessagingService.instance().sendWithCallback(echoMessageOut,
FBUtilities.getBroadcastAddressAndPort(), msg ->
+ .respond(echoMessage))
{
- assertEquals(ECHO_REQ, msg.verb());
- assertEquals(echoMessage.payload, msg.payload);
- });
- // we must have intercepted the outgoing message at this point
- Message<?> msg = spy.captureMessageOut().get();
- assertEquals(1, spy.messagesIntercepted());
- assertSame(echoMessage.payload, msg.payload);
+ Message<NoPayload> echoMessageOut = Message.out(ECHO_REQ,
NoPayload.noPayload);
+ MessagingService.instance().sendWithCallback(echoMessageOut,
FBUtilities.getBroadcastAddressAndPort(), msg ->
+ {
+ assertEquals(ECHO_REQ, msg.verb());
+ assertEquals(echoMessage.payload, msg.payload);
+ });
+
+ // we must have intercepted the outgoing message at this point
+ Message<?> msg = spy.captureMessageOut().get();
+ assertEquals(1, spy.messagesIntercepted());
+ assertSame(echoMessage.payload, msg.payload);
- // and return a mocked response
- Util.spinAssertEquals(1, spy::mockedMessageResponses, 60);
+ // and return a mocked response
+ Util.spinAssertEquals(1, spy::mockedMessageResponses, 60);
+ }
}
}
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
index b27c9b0d7b..998ef41f81 100644
--- a/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
+++ b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.net;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -50,7 +50,7 @@ import static
org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQu
* @see MatcherResponse
* @see MockMessagingService
*/
-public class MockMessagingSpy
+public class MockMessagingSpy implements AutoCloseable
{
private static final Logger logger =
LoggerFactory.getLogger(MockMessagingSpy.class);
@@ -73,7 +73,9 @@ public class MockMessagingSpy
private final BlockingQueue<Message<?>> interceptedMessages =
newBlockingQueue();
private final BlockingQueue<Message<?>> deliveredResponses =
newBlockingQueue();
- private static final Executor executor =
Executors.newSingleThreadExecutor();
+ private final ExecutorService executor =
Executors.newSingleThreadExecutor();
+ final ExecutorService responseExecutor = Executors.newFixedThreadPool(5);
+
/**
* Returns a future with the first mocked incoming message that has been
created and delivered.
@@ -186,6 +188,13 @@ public class MockMessagingSpy
deliveredResponses.add(response);
}
+ @Override
+ public void close()
+ {
+ executor.shutdown();
+ responseExecutor.shutdown();
+ }
+
private static class CapturedResultsFuture<T> extends
AbstractFuture<List<T>> implements Runnable
{
private final int waitForResults;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]