This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new b824273  Remove assumption that all urgent messages are small
b824273 is described below

commit b8242730918c2e8edec83aeafeeae8255378125d
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Thu Aug 12 10:47:54 2021 +0100

    Remove assumption that all urgent messages are small
    
    Patch by Sam Tunnicliffe; reviewed by Caleb Rackliffe for CASSANDRA-16877
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/net/OutboundConnections.java  | 25 +++++++--
 .../cassandra/net/OutboundConnectionsTest.java     | 60 +++++++++++++++-------
 3 files changed, 62 insertions(+), 24 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index ecd4409..9ed3cec 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.1
+ * Remove assumption that all urgent messages are small (CASSANDRA-16877)
  * ArrayClustering.unsharedHeapSize does not include the data so undercounts 
the heap size (CASSANDRA-16845)
  * Improve help, doc and error messages about sstabledump -k and -x arguments 
(CASSANDRA-16818)
  * Add repaired/unrepaired bytes back to nodetool (CASSANDRA-15282)
diff --git a/src/java/org/apache/cassandra/net/OutboundConnections.java 
b/src/java/org/apache/cassandra/net/OutboundConnections.java
index f1e1276..3f607d1 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnections.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnections.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.InternodeOutboundMetrics;
+import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 import static org.apache.cassandra.net.MessagingService.current_version;
@@ -199,12 +200,26 @@ public class OutboundConnections
         if (specifyConnection != null)
             return specifyConnection;
 
-        if (msg.verb().priority == Verb.Priority.P0)
-            return URGENT_MESSAGES;
+        if (msg.serializedSize(current_version) > LARGE_MESSAGE_THRESHOLD)
+        {
+            if (msg.verb().priority == Verb.Priority.P0)
+            {
+                NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.MINUTES,
+                                 "Enqueued URGENT message which exceeds large 
message threshold");
+
+                if (logger.isTraceEnabled())
+                    logger.trace("{} message with size {} exceeded large 
message threshold {}",
+                                 msg.verb(),
+                                 msg.serializedSize(current_version),
+                                 LARGE_MESSAGE_THRESHOLD);
+            }
+
+            return LARGE_MESSAGES;
+        }
 
-        return msg.serializedSize(current_version) <= LARGE_MESSAGE_THRESHOLD
-               ? SMALL_MESSAGES
-               : LARGE_MESSAGES;
+        return msg.verb().priority == Verb.Priority.P0
+               ? URGENT_MESSAGES
+               : SMALL_MESSAGES;
     }
 
     @VisibleForTesting
diff --git a/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java 
b/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
index 32faea3..538636a 100644
--- a/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
+++ b/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
@@ -35,11 +35,15 @@ import org.junit.Test;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.gms.GossipDigestSyn;
+import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
 
+import static org.apache.cassandra.net.MessagingService.current_version;
+import static 
org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
+
 public class OutboundConnectionsTest
 {
     static final InetAddressAndPort LOCAL_ADDR = 
InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"),
 9476);
@@ -48,6 +52,24 @@ public class OutboundConnectionsTest
     private static final List<ConnectionType> INTERNODE_MESSAGING_CONN_TYPES = 
ImmutableList.of(ConnectionType.URGENT_MESSAGES, ConnectionType.LARGE_MESSAGES, 
ConnectionType.SMALL_MESSAGES);
 
     private OutboundConnections connections;
+    // for testing messages larger than the size threshold, we just need a 
serializer to report a size, as fake as it may be
+    public static final IVersionedSerializer<Object> SERIALIZER = new 
IVersionedSerializer<Object>()
+    {
+        public void serialize(Object o, DataOutputPlus out, int version)
+        {
+
+        }
+
+        public Object deserialize(DataInputPlus in, int version)
+        {
+            return null;
+        }
+
+        public long serializedSize(Object o, int version)
+        {
+            return LARGE_MESSAGE_THRESHOLD + 1;
+        }
+    };
 
     @BeforeClass
     public static void before()
@@ -78,6 +100,24 @@ public class OutboundConnectionsTest
     }
 
     @Test
+    public void getConnection_Gossip_Oversized() throws NoSuchFieldException, 
IllegalAccessException
+    {
+        IVersionedAsymmetricSerializer<?,?> restore = 
Verb.GOSSIP_DIGEST_ACK.serializer();
+        try
+        {
+            Verb.GOSSIP_DIGEST_ACK.unsafeSetSerializer(() -> SERIALIZER);
+            Message message = Message.out(Verb.GOSSIP_DIGEST_ACK, "payload");
+            Assert.assertTrue(message.serializedSize(current_version) > 
LARGE_MESSAGE_THRESHOLD);
+            Assert.assertEquals(ConnectionType.LARGE_MESSAGES, 
connections.connectionFor(message).type());
+        }
+        finally
+        {
+            Verb.GOSSIP_DIGEST_ACK.unsafeSetSerializer(() -> restore);
+        }
+    }
+
+
+    @Test
     public void getConnection_SmallMessage()
     {
         Message message = Message.out(Verb.PING_REQ, PingRequest.forSmall);
@@ -87,25 +127,7 @@ public class OutboundConnectionsTest
     @Test
     public void getConnection_LargeMessage() throws NoSuchFieldException, 
IllegalAccessException
     {
-        // just need a serializer to report a size, as fake as it may be
-        IVersionedSerializer<Object> serializer = new 
IVersionedSerializer<Object>()
-        {
-            public void serialize(Object o, DataOutputPlus out, int version)
-            {
-
-            }
-
-            public Object deserialize(DataInputPlus in, int version)
-            {
-                return null;
-            }
-
-            public long serializedSize(Object o, int version)
-            {
-                return OutboundConnections.LARGE_MESSAGE_THRESHOLD + 1;
-            }
-        };
-        Verb._TEST_2.unsafeSetSerializer(() -> serializer);
+        Verb._TEST_2.unsafeSetSerializer(() -> SERIALIZER);
         Message message = Message.out(Verb._TEST_2, "payload");
         Assert.assertEquals(ConnectionType.LARGE_MESSAGES, 
connections.connectionFor(message).type());
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to