This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
new b824273 Remove assumption that all urgent messages are small
b824273 is described below
commit b8242730918c2e8edec83aeafeeae8255378125d
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Thu Aug 12 10:47:54 2021 +0100
Remove assumption that all urgent messages are small
Patch by Sam Tunnicliffe; reviewed by Caleb Rackliffe for CASSANDRA-16877
---
CHANGES.txt | 1 +
.../apache/cassandra/net/OutboundConnections.java | 25 +++++++--
.../cassandra/net/OutboundConnectionsTest.java | 60 +++++++++++++++-------
3 files changed, 62 insertions(+), 24 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index ecd4409..9ed3cec 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0.1
+ * Remove assumption that all urgent messages are small (CASSANDRA-16877)
* ArrayClustering.unsharedHeapSize does not include the data so undercounts
the heap size (CASSANDRA-16845)
* Improve help, doc and error messages about sstabledump -k and -x arguments
(CASSANDRA-16818)
* Add repaired/unrepaired bytes back to nodetool (CASSANDRA-15282)
diff --git a/src/java/org/apache/cassandra/net/OutboundConnections.java
b/src/java/org/apache/cassandra/net/OutboundConnections.java
index f1e1276..3f607d1 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnections.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnections.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.config.Config;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.InternodeOutboundMetrics;
+import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import static org.apache.cassandra.net.MessagingService.current_version;
@@ -199,12 +200,26 @@ public class OutboundConnections
if (specifyConnection != null)
return specifyConnection;
- if (msg.verb().priority == Verb.Priority.P0)
- return URGENT_MESSAGES;
+ if (msg.serializedSize(current_version) > LARGE_MESSAGE_THRESHOLD)
+ {
+ if (msg.verb().priority == Verb.Priority.P0)
+ {
+ NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1,
TimeUnit.MINUTES,
+ "Enqueued URGENT message which exceeds large
message threshold");
+
+ if (logger.isTraceEnabled())
+ logger.trace("{} message with size {} exceeded large
message threshold {}",
+ msg.verb(),
+ msg.serializedSize(current_version),
+ LARGE_MESSAGE_THRESHOLD);
+ }
+
+ return LARGE_MESSAGES;
+ }
- return msg.serializedSize(current_version) <= LARGE_MESSAGE_THRESHOLD
- ? SMALL_MESSAGES
- : LARGE_MESSAGES;
+ return msg.verb().priority == Verb.Priority.P0
+ ? URGENT_MESSAGES
+ : SMALL_MESSAGES;
}
@VisibleForTesting
diff --git a/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
b/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
index 32faea3..538636a 100644
--- a/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
+++ b/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
@@ -35,11 +35,15 @@ import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.gms.GossipDigestSyn;
+import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
+import static org.apache.cassandra.net.MessagingService.current_version;
+import static
org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
+
public class OutboundConnectionsTest
{
static final InetAddressAndPort LOCAL_ADDR =
InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"),
9476);
@@ -48,6 +52,24 @@ public class OutboundConnectionsTest
private static final List<ConnectionType> INTERNODE_MESSAGING_CONN_TYPES =
ImmutableList.of(ConnectionType.URGENT_MESSAGES, ConnectionType.LARGE_MESSAGES,
ConnectionType.SMALL_MESSAGES);
private OutboundConnections connections;
+ // for testing messages larger than the size threshold, we just need a
serializer to report a size, as fake as it may be
+ public static final IVersionedSerializer<Object> SERIALIZER = new
IVersionedSerializer<Object>()
+ {
+ public void serialize(Object o, DataOutputPlus out, int version)
+ {
+
+ }
+
+ public Object deserialize(DataInputPlus in, int version)
+ {
+ return null;
+ }
+
+ public long serializedSize(Object o, int version)
+ {
+ return LARGE_MESSAGE_THRESHOLD + 1;
+ }
+ };
@BeforeClass
public static void before()
@@ -78,6 +100,24 @@ public class OutboundConnectionsTest
}
@Test
+ public void getConnection_Gossip_Oversized() throws NoSuchFieldException,
IllegalAccessException
+ {
+ IVersionedAsymmetricSerializer<?,?> restore =
Verb.GOSSIP_DIGEST_ACK.serializer();
+ try
+ {
+ Verb.GOSSIP_DIGEST_ACK.unsafeSetSerializer(() -> SERIALIZER);
+ Message message = Message.out(Verb.GOSSIP_DIGEST_ACK, "payload");
+ Assert.assertTrue(message.serializedSize(current_version) >
LARGE_MESSAGE_THRESHOLD);
+ Assert.assertEquals(ConnectionType.LARGE_MESSAGES,
connections.connectionFor(message).type());
+ }
+ finally
+ {
+ Verb.GOSSIP_DIGEST_ACK.unsafeSetSerializer(() -> restore);
+ }
+ }
+
+
+ @Test
public void getConnection_SmallMessage()
{
Message message = Message.out(Verb.PING_REQ, PingRequest.forSmall);
@@ -87,25 +127,7 @@ public class OutboundConnectionsTest
@Test
public void getConnection_LargeMessage() throws NoSuchFieldException,
IllegalAccessException
{
- // just need a serializer to report a size, as fake as it may be
- IVersionedSerializer<Object> serializer = new
IVersionedSerializer<Object>()
- {
- public void serialize(Object o, DataOutputPlus out, int version)
- {
-
- }
-
- public Object deserialize(DataInputPlus in, int version)
- {
- return null;
- }
-
- public long serializedSize(Object o, int version)
- {
- return OutboundConnections.LARGE_MESSAGE_THRESHOLD + 1;
- }
- };
- Verb._TEST_2.unsafeSetSerializer(() -> serializer);
+ Verb._TEST_2.unsafeSetSerializer(() -> SERIALIZER);
Message message = Message.out(Verb._TEST_2, "payload");
Assert.assertEquals(ConnectionType.LARGE_MESSAGES,
connections.connectionFor(message).type());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]