This is an automated email from the ASF dual-hosted git repository.
djoshi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 311891f Shuffle forwarding replica for messages to non-local DC.
311891f is described below
commit 311891f35e538f4a8be0309f6d7045fef59dee71
Author: Jon Meredith <[email protected]>
AuthorDate: Mon Sep 9 10:06:20 2019 -0600
Shuffle forwarding replica for messages to non-local DC.
Patch by Jon Meredith, reviewed by Dinesh Joshi for CASSANDRA-15318
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/service/StorageProxy.java | 15 ++++++++++-----
.../cassandra/distributed/test/MessageForwardingTest.java | 11 +++++++++++
3 files changed, 22 insertions(+), 5 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 53788a7..da57886 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha3
+ * Shuffle forwarding replica for messages to non-local DC (CASSANDRA-15318)
* Optimise native protocol ASCII string encoding (CASSANDRA-15410)
* Make sure all exceptions are propagated in DebuggableThreadPoolExecutor
(CASSANDRA-15332)
* Make it possible to resize concurrent read / write thread pools at runtime
(CASSANDRA-15277)
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index ded48bf..11c72ec 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1283,16 +1283,17 @@ public class StorageProxy implements StorageProxyMBean
/*
* Send the message to the first replica of targets, and have it forward
the message to others in its DC
- *
- * TODO: are targets shuffled? do we want them to be to spread out
forwarding burden?
*/
private static void sendMessagesToNonlocalDC(Message<? extends IMutation>
message,
EndpointsForToken targets,
AbstractWriteResponseHandler<IMutation> handler)
{
+ final Replica target;
+
if (targets.size() > 1)
{
- EndpointsForToken forwardToReplicas = targets.subList(1,
targets.size());
+ target = targets.get(ThreadLocalRandom.current().nextInt(0,
targets.size()));
+ EndpointsForToken forwardToReplicas = targets.filter(r -> r !=
target, targets.size());
for (Replica replica : forwardToReplicas)
{
@@ -1306,9 +1307,13 @@ public class StorageProxy implements StorageProxyMBean
message = message.withForwardTo(new
ForwardingInfo(forwardToReplicas.endpointList(), messageIds));
}
+ else
+ {
+ target = targets.get(0);
+ }
- MessagingService.instance().sendWriteWithCallback(message,
targets.get(0), handler, true);
- logger.trace("Sending message to {}@{}", message.id(), targets.get(0));
+ MessagingService.instance().sendWriteWithCallback(message, target,
handler, true);
+ logger.trace("Sending message to {}@{}", message.id(), target);
}
private static void performLocally(Stage stage, Replica localReplica,
final Runnable runnable)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
index 72928e4..7c3d7a2 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.impl.IsolatedExecutor;
import org.apache.cassandra.distributed.impl.TracingUtil;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.UUIDGen;
public class MessageForwardingTest extends DistributedTestBase
@@ -44,6 +45,7 @@ public class MessageForwardingTest extends DistributedTestBase
{
String originalTraceTimeout =
TracingUtil.setWaitForTracingEventTimeoutSecs("1");
final int numInserts = 100;
+ Map<InetAddress,Integer> forwardFromCounts = new HashMap<>();
Map<InetAddress,Integer> commitCounts = new HashMap<>();
try (Cluster cluster = init(Cluster.build()
@@ -66,6 +68,7 @@ public class MessageForwardingTest extends DistributedTestBase
//noinspection ResultOfMethodCallIgnored
inserts.map(IsolatedExecutor::waitOn).count();
+ cluster.stream("dc1").forEach(instance ->
forwardFromCounts.put(instance.broadcastAddressAndPort().address, 0));
cluster.forEach(instance ->
commitCounts.put(instance.broadcastAddressAndPort().address, 0));
List<TracingUtil.TraceEntry> traces =
TracingUtil.getTrace(cluster, sessionId, ConsistencyLevel.ALL);
traces.forEach(traceEntry -> {
@@ -73,8 +76,16 @@ public class MessageForwardingTest extends
DistributedTestBase
{
commitCounts.compute(traceEntry.source, (k, v) -> (v !=
null ? v : 0) + 1);
}
+ else if (traceEntry.activity.contains("Enqueuing forwarded
write to "))
+ {
+ forwardFromCounts.compute(traceEntry.source, (k, v) -> (v
!= null ? v : 0) + 1);
+ }
});
+ // Check that each node in dc1 was the forwarder at least once.
There is a (1/3)^numInserts chance
+ // that the same node will be picked, but the odds of that are
~2e-48.
+ forwardFromCounts.forEach((source, count) ->
Assert.assertTrue(source + " should have been randomized to forward messages",
count > 0));
+
// Check that each node received the forwarded messages once (and
only once)
commitCounts.forEach((source, count) ->
Assert.assertEquals(source + " appending to
commitlog traces",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]