This is an automated email from the ASF dual-hosted git repository.
clohfink pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new da90439 Make native_transport_max_concurrent_requests_in_bytes
updatable
da90439 is described below
commit da90439f0052c3a05aaf6d4268a8c719e10fafde
Author: Jordan West <[email protected]>
AuthorDate: Fri Feb 7 08:24:14 2020 -0800
Make native_transport_max_concurrent_requests_in_bytes updatable
Patch by Jordan West; Reviewed by Chris Lohfink CASSANDRA-15519
---
CHANGES.txt | 1 +
.../org/apache/cassandra/net/ResourceLimits.java | 38 ++++-
.../apache/cassandra/service/StorageService.java | 25 ++++
.../cassandra/service/StorageServiceMBean.java | 7 +
.../org/apache/cassandra/transport/Server.java | 28 ++++
.../InflightRequestPayloadTrackerTest.java | 153 ++++++++++++++++++++-
6 files changed, 248 insertions(+), 4 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 997d9d0..33a0581 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha4
+ * Make native_transport_max_concurrent_requests_in_bytes updatable
(CASSANDRA-15519)
* Cleanup and improvements to IndexInfo/ColumnIndex (CASSANDRA-15469)
* Potential Overflow in DatabaseDescriptor Functions That Convert Between
KB/MB & Bytes (CASSANDRA-15470)
4.0-alpha3
diff --git a/src/java/org/apache/cassandra/net/ResourceLimits.java
b/src/java/org/apache/cassandra/net/ResourceLimits.java
index f8d24d7..4c97c2a 100644
--- a/src/java/org/apache/cassandra/net/ResourceLimits.java
+++ b/src/java/org/apache/cassandra/net/ResourceLimits.java
@@ -36,6 +36,20 @@ public abstract class ResourceLimits
long limit();
/**
+ * Sets the total amount of permits represented by this {@link Limit}
- the capacity
+ *
+ * If the old limit has been reached and the new limit is large enough
to allow for more
+ * permits to be aqcuired, subsequent calls to {@link #allocate(long)}
or {@link #tryAllocate(long)}
+ * will succeed.
+ *
+ * If the new limit is lower than the current amount of allocated
permits then subsequent calls
+ * to {@link #allocate(long)} or {@link #tryAllocate(long)} will block
or fail respectively.
+ *
+ * @return the old limit
+ */
+ long setLimit(long newLimit);
+
+ /**
* @return remaining, unallocated permit amount
*/
long remaining();
@@ -73,7 +87,9 @@ public abstract class ResourceLimits
*/
public static class Concurrent implements Limit
{
- private final long limit;
+ private volatile long limit;
+ private static final AtomicLongFieldUpdater<Concurrent> limitUpdater =
+ AtomicLongFieldUpdater.newUpdater(Concurrent.class, "limit");
private volatile long using;
private static final AtomicLongFieldUpdater<Concurrent> usingUpdater =
@@ -89,6 +105,16 @@ public abstract class ResourceLimits
return limit;
}
+ public long setLimit(long newLimit)
+ {
+ long oldLimit;
+ do {
+ oldLimit = limit;
+ } while (!limitUpdater.compareAndSet(this, oldLimit, newLimit));
+
+ return oldLimit;
+ }
+
public long remaining()
{
return limit - using;
@@ -139,7 +165,7 @@ public abstract class ResourceLimits
*/
static class Basic implements Limit
{
- private final long limit;
+ private long limit;
private long using;
Basic(long limit)
@@ -152,6 +178,14 @@ public abstract class ResourceLimits
return limit;
}
+ public long setLimit(long newLimit)
+ {
+ long oldLimit = limit;
+ limit = newLimit;
+
+ return oldLimit;
+ }
+
public long remaining()
{
return limit - using;
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index 6dd5d37..aa03aea 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -96,6 +96,7 @@ import org.apache.cassandra.schema.ViewMetadata;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.tracing.TraceKeyspace;
import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.transport.Server;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.logging.LoggingSupportFactory;
import org.apache.cassandra.utils.progress.ProgressEvent;
@@ -5447,6 +5448,30 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
logger.info("Setting corrupted tombstone strategy to {}", strategy);
}
+ @Override
+ public long getNativeTransportMaxConcurrentRequestsInBytes()
+ {
+ return Server.EndpointPayloadTracker.getGlobalLimit();
+ }
+
+ @Override
+ public void setNativeTransportMaxConcurrentRequestsInBytes(long newLimit)
+ {
+ Server.EndpointPayloadTracker.setGlobalLimit(newLimit);
+ }
+
+ @Override
+ public long getNativeTransportMaxConcurrentRequestsInBytesPerIp()
+ {
+ return Server.EndpointPayloadTracker.getEndpointLimit();
+ }
+
+ @Override
+ public void setNativeTransportMaxConcurrentRequestsInBytesPerIp(long
newLimit)
+ {
+ Server.EndpointPayloadTracker.setEndpointLimit(newLimit);
+ }
+
@VisibleForTesting
public void shutdownServer()
{
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index a324a1c..dea25b4 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -522,6 +522,13 @@ public interface StorageServiceMBean extends
NotificationEmitter
public void enableNativeTransportOldProtocolVersions();
public void disableNativeTransportOldProtocolVersions();
+ // sets limits on number of concurrent requests in flights in number of
bytes
+ public long getNativeTransportMaxConcurrentRequestsInBytes();
+ public void setNativeTransportMaxConcurrentRequestsInBytes(long newLimit);
+ public long getNativeTransportMaxConcurrentRequestsInBytesPerIp();
+ public void setNativeTransportMaxConcurrentRequestsInBytesPerIp(long
newLimit);
+
+
// allows a node that have been started without joining the ring to join it
public void joinRing() throws IOException;
public boolean isJoined();
diff --git a/src/java/org/apache/cassandra/transport/Server.java
b/src/java/org/apache/cassandra/transport/Server.java
index c4690f1..43b024f 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -366,6 +366,34 @@ public class Server implements CassandraDaemon.Server
}
}
+ public static long getGlobalLimit()
+ {
+ return
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes();
+ }
+
+ public static void setGlobalLimit(long newLimit)
+ {
+
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(newLimit);
+ long existingLimit =
globalRequestPayloadInFlight.setLimit(DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
+
+ logger.info("Changed native_max_transport_requests_in_bytes from
{} to {}", existingLimit, newLimit);
+ }
+
+ public static long getEndpointLimit()
+ {
+ return
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp();
+ }
+
+ public static void setEndpointLimit(long newLimit)
+ {
+ long existingLimit =
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp();
+
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(newLimit);
// ensure new trackers get the new limit
+ for (EndpointPayloadTracker tracker :
requestPayloadInFlightPerEndpoint.values())
+ existingLimit =
tracker.endpointAndGlobalPayloadsInFlight.endpoint().setLimit(newLimit);
+
+ logger.info("Changed native_max_transport_requests_in_bytes_per_ip
from {} to {}", existingLimit, newLimit);
+ }
+
private boolean acquire()
{
return 0 < refCount.updateAndGet(i -> i < 0 ? i : i + 1);
diff --git
a/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
index c9a9a02..3c18a75 100644
---
a/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
+++
b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
@@ -37,6 +37,10 @@ import org.apache.cassandra.transport.messages.QueryMessage;
@RunWith(OrderedJUnit4ClassRunner.class)
public class InflightRequestPayloadTrackerTest extends CQLTester
{
+
+ private static long LOW_LIMIT = 600L;
+ private static long HIGH_LIMIT = 5000000000L;
+
@BeforeClass
public static void setUp()
{
@@ -49,7 +53,7 @@ public class InflightRequestPayloadTrackerTest extends
CQLTester
public static void tearDown()
{
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(3000000000L);
-
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(5000000000L);
+
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(HIGH_LIMIT);
}
@After
@@ -255,4 +259,149 @@ public class InflightRequestPayloadTrackerTest extends
CQLTester
client.close();
}
}
-}
\ No newline at end of file
+
+ @Test
+ public void testChangingLimitsAtRuntime() throws Throwable
+ {
+ SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+ nativePort,
+ ProtocolVersion.V5,
+ true,
+ new EncryptionOptions());
+ client.connect(false, true, true);
+
+ QueryOptions queryOptions = QueryOptions.create(
+ QueryOptions.DEFAULT.getConsistency(),
+ QueryOptions.DEFAULT.getValues(),
+ QueryOptions.DEFAULT.skipMetadata(),
+ QueryOptions.DEFAULT.getPageSize(),
+ QueryOptions.DEFAULT.getPagingState(),
+ QueryOptions.DEFAULT.getSerialConsistency(),
+ ProtocolVersion.V5,
+ KEYSPACE);
+
+ try
+ {
+ QueryMessage queryMessage = new QueryMessage(String.format("CREATE
TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
+ queryOptions);
+ client.execute(queryMessage);
+
+ queryMessage = new QueryMessage(String.format("INSERT INTO
%s.atable (pk, v) VALUES (1,
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
[...]
+ queryOptions);
+ try
+ {
+ client.execute(queryMessage);
+ Assert.fail();
+ }
+ catch (RuntimeException e)
+ {
+ Assert.assertTrue(e.getCause() instanceof OverloadedException);
+ }
+
+
+ // change global limit, query will still fail because endpoint
limit
+ Server.EndpointPayloadTracker.setGlobalLimit(HIGH_LIMIT);
+ Assert.assertEquals("new global limit not returned by
EndpointPayloadTrackers", HIGH_LIMIT,
Server.EndpointPayloadTracker.getGlobalLimit());
+ Assert.assertEquals("new global limit not returned by
DatabaseDescriptor", HIGH_LIMIT,
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
+
+ queryMessage = new QueryMessage(String.format("INSERT INTO
%s.atable (pk, v) VALUES (1,
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
[...]
+ queryOptions);
+ try
+ {
+ client.execute(queryMessage);
+ Assert.fail();
+ }
+ catch (RuntimeException e)
+ {
+ Assert.assertTrue(e.getCause() instanceof OverloadedException);
+ }
+
+ // change endpoint limit, query will still now succeed
+ Server.EndpointPayloadTracker.setEndpointLimit(HIGH_LIMIT);
+ Assert.assertEquals("new endpoint limit not returned by
EndpointPayloadTrackers", HIGH_LIMIT,
Server.EndpointPayloadTracker.getEndpointLimit());
+ Assert.assertEquals("new endpoint limit not returned by
DatabaseDescriptor", HIGH_LIMIT,
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp());
+
+ queryMessage = new QueryMessage(String.format("INSERT INTO
%s.atable (pk, v) VALUES (1,
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
[...]
+ queryOptions);
+ client.execute(queryMessage);
+
+ // ensure new clients also see the new raised limits
+ client.close();
+ client = new SimpleClient(nativeAddr.getHostAddress(),
+ nativePort,
+ ProtocolVersion.V5,
+ true,
+ new EncryptionOptions());
+ client.connect(false, true, true);
+
+ queryMessage = new QueryMessage(String.format("INSERT INTO
%s.atable (pk, v) VALUES (1,
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
[...]
+ queryOptions);
+ client.execute(queryMessage);
+
+ // lower the global limit and ensure the query fails again
+ Server.EndpointPayloadTracker.setGlobalLimit(LOW_LIMIT);
+ Assert.assertEquals("new global limit not returned by
EndpointPayloadTrackers", LOW_LIMIT,
Server.EndpointPayloadTracker.getGlobalLimit());
+ Assert.assertEquals("new global limit not returned by
DatabaseDescriptor", LOW_LIMIT,
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
+
+ queryMessage = new QueryMessage(String.format("INSERT INTO
%s.atable (pk, v) VALUES (1,
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
[...]
+ queryOptions);
+ try
+ {
+ client.execute(queryMessage);
+ Assert.fail();
+ }
+ catch (RuntimeException e)
+ {
+ Assert.assertTrue(e.getCause() instanceof OverloadedException);
+ }
+
+ // lower the endpoint limit and ensure existing clients also have
requests that fail
+ Server.EndpointPayloadTracker.setEndpointLimit(60);
+ Assert.assertEquals("new endpoint limit not returned by
EndpointPayloadTrackers", 60, Server.EndpointPayloadTracker.getEndpointLimit());
+ Assert.assertEquals("new endpoint limit not returned by
DatabaseDescriptor", 60,
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp());
+
+ queryMessage = new QueryMessage(String.format("CREATE TABLE
%s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
+ queryOptions);
+ try
+ {
+ client.execute(queryMessage);
+ Assert.fail();
+ }
+ catch (RuntimeException e)
+ {
+ Assert.assertTrue(e.getCause() instanceof OverloadedException);
+ }
+
+ // ensure new clients also see the new lowered limit
+ client.close();
+ client = new SimpleClient(nativeAddr.getHostAddress(),
+ nativePort,
+ ProtocolVersion.V5,
+ true,
+ new EncryptionOptions());
+ client.connect(false, true, true);
+
+ queryMessage = new QueryMessage(String.format("CREATE TABLE
%s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
+ queryOptions);
+ try
+ {
+ client.execute(queryMessage);
+ Assert.fail();
+ }
+ catch (RuntimeException e)
+ {
+ Assert.assertTrue(e.getCause() instanceof OverloadedException);
+ }
+
+ // put the test state back
+ Server.EndpointPayloadTracker.setEndpointLimit(LOW_LIMIT);
+ Assert.assertEquals("new endpoint limit not returned by
EndpointPayloadTrackers", LOW_LIMIT,
Server.EndpointPayloadTracker.getEndpointLimit());
+ Assert.assertEquals("new endpoint limit not returned by
DatabaseDescriptor", LOW_LIMIT,
DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp());
+
+ }
+ finally
+ {
+ client.close();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]