This is an automated email from the ASF dual-hosted git repository.

clohfink pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new da90439  Make native_transport_max_concurrent_requests_in_bytes 
updatable
da90439 is described below

commit da90439f0052c3a05aaf6d4268a8c719e10fafde
Author: Jordan West <[email protected]>
AuthorDate: Fri Feb 7 08:24:14 2020 -0800

    Make native_transport_max_concurrent_requests_in_bytes updatable
    
    Patch by Jordan West; Reviewed by Chris Lohfink CASSANDRA-15519
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/net/ResourceLimits.java   |  38 ++++-
 .../apache/cassandra/service/StorageService.java   |  25 ++++
 .../cassandra/service/StorageServiceMBean.java     |   7 +
 .../org/apache/cassandra/transport/Server.java     |  28 ++++
 .../InflightRequestPayloadTrackerTest.java         | 153 ++++++++++++++++++++-
 6 files changed, 248 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 997d9d0..33a0581 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha4
+ * Make native_transport_max_concurrent_requests_in_bytes updatable 
(CASSANDRA-15519)
  * Cleanup and improvements to IndexInfo/ColumnIndex (CASSANDRA-15469)
  * Potential Overflow in DatabaseDescriptor Functions That Convert Between 
KB/MB & Bytes (CASSANDRA-15470)
 4.0-alpha3
diff --git a/src/java/org/apache/cassandra/net/ResourceLimits.java 
b/src/java/org/apache/cassandra/net/ResourceLimits.java
index f8d24d7..4c97c2a 100644
--- a/src/java/org/apache/cassandra/net/ResourceLimits.java
+++ b/src/java/org/apache/cassandra/net/ResourceLimits.java
@@ -36,6 +36,20 @@ public abstract class ResourceLimits
         long limit();
 
         /**
+         * Sets the total amount of permits represented by this {@link Limit} 
- the capacity
+         *
+         * If the old limit has been reached and the new limit is large enough 
to allow for more
+         * permits to be aqcuired, subsequent calls to {@link #allocate(long)} 
or {@link #tryAllocate(long)}
+         * will succeed.
+         *
+         * If the new limit is lower than the current amount of allocated 
permits then subsequent calls
+         * to {@link #allocate(long)} or {@link #tryAllocate(long)} will block 
or fail respectively.
+         *
+         * @return the old limit
+         */
+        long setLimit(long newLimit);
+
+        /**
          * @return remaining, unallocated permit amount
          */
         long remaining();
@@ -73,7 +87,9 @@ public abstract class ResourceLimits
      */
     public static class Concurrent implements Limit
     {
-        private final long limit;
+        private volatile long limit;
+        private static final AtomicLongFieldUpdater<Concurrent> limitUpdater =
+            AtomicLongFieldUpdater.newUpdater(Concurrent.class, "limit");
 
         private volatile long using;
         private static final AtomicLongFieldUpdater<Concurrent> usingUpdater =
@@ -89,6 +105,16 @@ public abstract class ResourceLimits
             return limit;
         }
 
+        public long setLimit(long newLimit)
+        {
+            long oldLimit;
+            do {
+                oldLimit = limit;
+            } while (!limitUpdater.compareAndSet(this, oldLimit, newLimit));
+
+            return oldLimit;
+        }
+
         public long remaining()
         {
             return limit - using;
@@ -139,7 +165,7 @@ public abstract class ResourceLimits
      */
     static class Basic implements Limit
     {
-        private final long limit;
+        private long limit;
         private long using;
 
         Basic(long limit)
@@ -152,6 +178,14 @@ public abstract class ResourceLimits
             return limit;
         }
 
+        public long setLimit(long newLimit)
+        {
+            long oldLimit = limit;
+            limit = newLimit;
+
+            return oldLimit;
+        }
+
         public long remaining()
         {
             return limit - using;
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 6dd5d37..aa03aea 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -96,6 +96,7 @@ import org.apache.cassandra.schema.ViewMetadata;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.tracing.TraceKeyspace;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.logging.LoggingSupportFactory;
 import org.apache.cassandra.utils.progress.ProgressEvent;
@@ -5447,6 +5448,30 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         logger.info("Setting corrupted tombstone strategy to {}", strategy);
     }
 
+    @Override
+    public long getNativeTransportMaxConcurrentRequestsInBytes()
+    {
+        return Server.EndpointPayloadTracker.getGlobalLimit();
+    }
+
+    @Override
+    public void setNativeTransportMaxConcurrentRequestsInBytes(long newLimit)
+    {
+        Server.EndpointPayloadTracker.setGlobalLimit(newLimit);
+    }
+
+    @Override
+    public long getNativeTransportMaxConcurrentRequestsInBytesPerIp()
+    {
+        return Server.EndpointPayloadTracker.getEndpointLimit();
+    }
+
+    @Override
+    public void setNativeTransportMaxConcurrentRequestsInBytesPerIp(long 
newLimit)
+    {
+        Server.EndpointPayloadTracker.setEndpointLimit(newLimit);
+    }
+
     @VisibleForTesting
     public void shutdownServer()
     {
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index a324a1c..dea25b4 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -522,6 +522,13 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     public void enableNativeTransportOldProtocolVersions();
     public void disableNativeTransportOldProtocolVersions();
 
+    // sets limits on number of concurrent requests in flights in number of 
bytes
+    public long getNativeTransportMaxConcurrentRequestsInBytes();
+    public void setNativeTransportMaxConcurrentRequestsInBytes(long newLimit);
+    public long getNativeTransportMaxConcurrentRequestsInBytesPerIp();
+    public void setNativeTransportMaxConcurrentRequestsInBytesPerIp(long 
newLimit);
+
+
     // allows a node that have been started without joining the ring to join it
     public void joinRing() throws IOException;
     public boolean isJoined();
diff --git a/src/java/org/apache/cassandra/transport/Server.java 
b/src/java/org/apache/cassandra/transport/Server.java
index c4690f1..43b024f 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -366,6 +366,34 @@ public class Server implements CassandraDaemon.Server
             }
         }
 
+        public static long getGlobalLimit()
+        {
+            return 
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes();
+        }
+
+        public static void setGlobalLimit(long newLimit)
+        {
+            
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(newLimit);
+            long existingLimit = 
globalRequestPayloadInFlight.setLimit(DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
+
+            logger.info("Changed native_max_transport_requests_in_bytes from 
{} to {}", existingLimit, newLimit);
+        }
+
+        public static long getEndpointLimit()
+        {
+            return 
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp();
+        }
+
+        public static void setEndpointLimit(long newLimit)
+        {
+            long existingLimit = 
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp();
+            
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(newLimit);
 // ensure new trackers get the new limit
+            for (EndpointPayloadTracker tracker : 
requestPayloadInFlightPerEndpoint.values())
+                existingLimit = 
tracker.endpointAndGlobalPayloadsInFlight.endpoint().setLimit(newLimit);
+
+            logger.info("Changed native_max_transport_requests_in_bytes_per_ip 
from {} to {}", existingLimit, newLimit);
+        }
+
         private boolean acquire()
         {
             return 0 < refCount.updateAndGet(i -> i < 0 ? i : i + 1);
diff --git 
a/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
 
b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
index c9a9a02..3c18a75 100644
--- 
a/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
+++ 
b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
@@ -37,6 +37,10 @@ import org.apache.cassandra.transport.messages.QueryMessage;
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class InflightRequestPayloadTrackerTest extends CQLTester
 {
+
+    private static long LOW_LIMIT = 600L;
+    private static long HIGH_LIMIT = 5000000000L;
+
     @BeforeClass
     public static void setUp()
     {
@@ -49,7 +53,7 @@ public class InflightRequestPayloadTrackerTest extends 
CQLTester
     public static void tearDown()
     {
         
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(3000000000L);
-        
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(5000000000L);
+        
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(HIGH_LIMIT);
     }
 
     @After
@@ -255,4 +259,149 @@ public class InflightRequestPayloadTrackerTest extends 
CQLTester
             client.close();
         }
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testChangingLimitsAtRuntime() throws Throwable
+    {
+        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                               nativePort,
+                                               ProtocolVersion.V5,
+                                               true,
+                                               new EncryptionOptions());
+        client.connect(false, true, true);
+
+        QueryOptions queryOptions = QueryOptions.create(
+        QueryOptions.DEFAULT.getConsistency(),
+        QueryOptions.DEFAULT.getValues(),
+        QueryOptions.DEFAULT.skipMetadata(),
+        QueryOptions.DEFAULT.getPageSize(),
+        QueryOptions.DEFAULT.getPagingState(),
+        QueryOptions.DEFAULT.getSerialConsistency(),
+        ProtocolVersion.V5,
+        KEYSPACE);
+
+        try
+        {
+            QueryMessage queryMessage = new QueryMessage(String.format("CREATE 
TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
+                                                         queryOptions);
+            client.execute(queryMessage);
+
+            queryMessage = new QueryMessage(String.format("INSERT INTO 
%s.atable (pk, v) VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
+                                            queryOptions);
+            try
+            {
+                client.execute(queryMessage);
+                Assert.fail();
+            }
+            catch (RuntimeException e)
+            {
+                Assert.assertTrue(e.getCause() instanceof OverloadedException);
+            }
+
+
+            // change global limit, query will still fail because endpoint 
limit
+            Server.EndpointPayloadTracker.setGlobalLimit(HIGH_LIMIT);
+            Assert.assertEquals("new global limit not returned by 
EndpointPayloadTrackers", HIGH_LIMIT, 
Server.EndpointPayloadTracker.getGlobalLimit());
+            Assert.assertEquals("new global limit not returned by 
DatabaseDescriptor", HIGH_LIMIT, 
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
+
+            queryMessage = new QueryMessage(String.format("INSERT INTO 
%s.atable (pk, v) VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
+                                            queryOptions);
+            try
+            {
+                client.execute(queryMessage);
+                Assert.fail();
+            }
+            catch (RuntimeException e)
+            {
+                Assert.assertTrue(e.getCause() instanceof OverloadedException);
+            }
+
+            // change endpoint limit, query will still now succeed
+            Server.EndpointPayloadTracker.setEndpointLimit(HIGH_LIMIT);
+            Assert.assertEquals("new endpoint limit not returned by 
EndpointPayloadTrackers", HIGH_LIMIT, 
Server.EndpointPayloadTracker.getEndpointLimit());
+            Assert.assertEquals("new endpoint limit not returned by 
DatabaseDescriptor", HIGH_LIMIT, 
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp());
+
+            queryMessage = new QueryMessage(String.format("INSERT INTO 
%s.atable (pk, v) VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
+                                            queryOptions);
+            client.execute(queryMessage);
+
+            // ensure new clients also see the new raised limits
+            client.close();
+            client = new SimpleClient(nativeAddr.getHostAddress(),
+                                       nativePort,
+                                       ProtocolVersion.V5,
+                                       true,
+                                       new EncryptionOptions());
+            client.connect(false, true, true);
+
+            queryMessage = new QueryMessage(String.format("INSERT INTO 
%s.atable (pk, v) VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
+                                            queryOptions);
+            client.execute(queryMessage);
+
+            // lower the global limit and ensure the query fails again
+            Server.EndpointPayloadTracker.setGlobalLimit(LOW_LIMIT);
+            Assert.assertEquals("new global limit not returned by 
EndpointPayloadTrackers", LOW_LIMIT, 
Server.EndpointPayloadTracker.getGlobalLimit());
+            Assert.assertEquals("new global limit not returned by 
DatabaseDescriptor", LOW_LIMIT, 
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
+
+            queryMessage = new QueryMessage(String.format("INSERT INTO 
%s.atable (pk, v) VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
+                                            queryOptions);
+            try
+            {
+                client.execute(queryMessage);
+                Assert.fail();
+            }
+            catch (RuntimeException e)
+            {
+                Assert.assertTrue(e.getCause() instanceof OverloadedException);
+            }
+
+            // lower the endpoint limit and ensure existing clients also have 
requests that fail
+            Server.EndpointPayloadTracker.setEndpointLimit(60);
+            Assert.assertEquals("new endpoint limit not returned by 
EndpointPayloadTrackers", 60, Server.EndpointPayloadTracker.getEndpointLimit());
+            Assert.assertEquals("new endpoint limit not returned by 
DatabaseDescriptor", 60, 
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp());
+
+            queryMessage = new QueryMessage(String.format("CREATE TABLE 
%s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
+                                                         queryOptions);
+            try
+            {
+                client.execute(queryMessage);
+                Assert.fail();
+            }
+            catch (RuntimeException e)
+            {
+                Assert.assertTrue(e.getCause() instanceof OverloadedException);
+            }
+
+            // ensure new clients also see the new lowered limit
+            client.close();
+            client = new SimpleClient(nativeAddr.getHostAddress(),
+                                      nativePort,
+                                      ProtocolVersion.V5,
+                                      true,
+                                      new EncryptionOptions());
+            client.connect(false, true, true);
+
+            queryMessage = new QueryMessage(String.format("CREATE TABLE 
%s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
+                                            queryOptions);
+            try
+            {
+                client.execute(queryMessage);
+                Assert.fail();
+            }
+            catch (RuntimeException e)
+            {
+                Assert.assertTrue(e.getCause() instanceof OverloadedException);
+            }
+
+            // put the test state back
+            Server.EndpointPayloadTracker.setEndpointLimit(LOW_LIMIT);
+            Assert.assertEquals("new endpoint limit not returned by 
EndpointPayloadTrackers", LOW_LIMIT, 
Server.EndpointPayloadTracker.getEndpointLimit());
+            Assert.assertEquals("new endpoint limit not returned by 
DatabaseDescriptor", LOW_LIMIT, 
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp());
+
+        }
+        finally
+        {
+            client.close();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to