This is an automated email from the ASF dual-hosted git repository.
jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 09b282d1fd Rate-limit new client connection auth setup to avoid
overwhelming bcrypt
09b282d1fd is described below
commit 09b282d1fdd7d6d62542137003011d144c0227be
Author: Josh McKenzie <[email protected]>
AuthorDate: Thu Aug 11 14:02:27 2022 -0400
Rate-limit new client connection auth setup to avoid overwhelming bcrypt
Patch by Chris Lohfink; reviewed by Caleb Rackliffe, Yifan Cai, and Josh
McKenzie for CASSANDRA-17812
Co-authored-by: Chris Lohfink <[email protected]>
Co-authored-by: Josh McKenzie <[email protected]>
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 2 +
.../cassandra/config/DatabaseDescriptor.java | 16 ++
.../org/apache/cassandra/transport/Dispatcher.java | 47 ++++--
src/java/org/apache/cassandra/utils/Shared.java | 2 +-
.../cassandra/transport/MessageDispatcherTest.java | 172 +++++++++++++++++++++
6 files changed, 228 insertions(+), 12 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 489d2d8845..3aaaf8b38e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Rate-limit new client connection auth setup to avoid overwhelming bcrypt
(CASSANDRA-17812)
* DataOutputBuffer#scratchBuffer can use off-heap or on-heap memory as a
means to control memory allocations (CASSANDRA-16471)
* Add ability to read the TTLs and write times of the elements of a
collection and/or UDT (CASSANDRA-8877)
* Removed Python < 2.7 support from formatting.py (CASSANDRA-17694)
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 68091ac90f..bdca0a7df1 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -266,6 +266,8 @@ public class Config
public int native_transport_max_threads = 128;
@Replaces(oldName = "native_transport_max_frame_size_in_mb", converter =
Converters.MEBIBYTES_DATA_STORAGE_INT, deprecated = true)
public DataStorageSpec.IntMebibytesBound native_transport_max_frame_size =
new DataStorageSpec.IntMebibytesBound("16MiB");
+ /** do bcrypt hashing in a limited pool to prevent cpu load spikes; note:
any value < 1 will be set to 1 on init **/
+ public int native_transport_max_auth_threads = 4;
public volatile long native_transport_max_concurrent_connections = -1L;
public volatile long native_transport_max_concurrent_connections_per_ip =
-1L;
public boolean native_transport_flush_in_batches_legacy = false;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 1ce16052fe..0a9036f632 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2573,6 +2573,22 @@ public class DatabaseDescriptor
conf.native_transport_max_threads = max_threads;
}
+ public static Integer getNativeTransportMaxAuthThreads()
+ {
+ return conf.native_transport_max_auth_threads;
+ }
+
+ /**
+ * If this value is set to <= 0 it will move auth requests to the standard
request pool regardless of the current
+ * size of the {@link
org.apache.cassandra.transport.Dispatcher#authExecutor}'s active size.
+ *
+ * see {@link org.apache.cassandra.transport.Dispatcher#dispatch} for
executor selection
+ */
+ public static void setNativeTransportMaxAuthThreads(int threads)
+ {
+ conf.native_transport_max_auth_threads = threads;
+ }
+
public static int getNativeTransportMaxFrameSize()
{
return conf.native_transport_max_frame_size.toBytes();
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java
b/src/java/org/apache/cassandra/transport/Dispatcher.java
index 8f8a607c77..f21acc2c6d 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,11 +52,31 @@ import static
org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
public class Dispatcher
{
private static final Logger logger =
LoggerFactory.getLogger(Dispatcher.class);
-
- private static final LocalAwareExecutorPlus requestExecutor =
SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
-
DatabaseDescriptor::setNativeTransportMaxThreads,
-
"transport",
-
"Native-Transport-Requests");
+
+ @VisibleForTesting
+ static final LocalAwareExecutorPlus requestExecutor =
SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
+
DatabaseDescriptor::setNativeTransportMaxThreads,
+
"transport",
+
"Native-Transport-Requests");
+
+ /** CASSANDRA-17812: Rate-limit new client connection setup to avoid
overwhelming during bcrypt
+ *
+ * authExecutor is a separate thread pool for handling requests on
connections that need to be authenticated.
+ * Calls to AUTHENTICATE can be expensive if the number of rounds for
bcrypt is configured to a high value,
+ * so during a connection storm checking the password hash would starve
existing connected clients for CPU and
+ * trigger timeouts if on the same thread pool as standard requests.
+ *
+ * Moving authentication requests to a small, separate pool prevents
starvation handling all other
+ * requests. If the authExecutor pool backs up, it may cause
authentication timeouts but the clients should
+ * back off and retry while the rest of the system continues to make
progress.
+ *
+ * Setting less than 1 will service auth requests on the standard {@link
Dispatcher#requestExecutor}
+ */
+ @VisibleForTesting
+ static final LocalAwareExecutorPlus authExecutor =
SHARED.newExecutor(Math.max(1,
DatabaseDescriptor.getNativeTransportMaxAuthThreads()),
+
DatabaseDescriptor::setNativeTransportMaxAuthThreads,
+
"transport",
+
"Native-Transport-Auth-Requests");
private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new
ConcurrentHashMap<>();
private final boolean useLegacyFlusher;
@@ -80,7 +101,14 @@ public class Dispatcher
public void dispatch(Channel channel, Message.Request request,
FlushItemConverter forFlusher, Overload backpressure)
{
- requestExecutor.submit(new RequestProcessor(channel, request,
forFlusher, backpressure));
+ // if native_transport_max_auth_threads is < 1, don't delegate to new
pool on auth messages
+ boolean isAuthQuery =
DatabaseDescriptor.getNativeTransportMaxAuthThreads() > 0 &&
+ (request.type == Message.Type.AUTH_RESPONSE ||
request.type == Message.Type.CREDENTIALS);
+
+ // Importantly, the authExecutor will handle the AUTHENTICATE message
which may be CPU intensive.
+ LocalAwareExecutorPlus executor = isAuthQuery ? authExecutor :
requestExecutor;
+
+ executor.submit(new RequestProcessor(channel, request, forFlusher,
backpressure));
ClientMetrics.instance.markRequestDispatched();
}
@@ -233,13 +261,10 @@ public class Dispatcher
public static void shutdown()
{
- if (requestExecutor != null)
- {
- requestExecutor.shutdown();
- }
+ requestExecutor.shutdown();
+ authExecutor.shutdown();
}
-
/**
* Dispatcher for EventMessages. In {@link
Server.ConnectionTracker#send(Event)}, the strategy
* for delivering events to registered clients is dependent on protocol
version and the configuration
diff --git a/src/java/org/apache/cassandra/utils/Shared.java
b/src/java/org/apache/cassandra/utils/Shared.java
index e576c8676c..6433624911 100644
--- a/src/java/org/apache/cassandra/utils/Shared.java
+++ b/src/java/org/apache/cassandra/utils/Shared.java
@@ -24,7 +24,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
- * Tells jvm-dtest that a class should be shared accross all {@link
ClassLoader}s.
+ * Tells jvm-dtest that a class should be shared across all {@link
ClassLoader}s.
*
* Jvm-dtest relies on classloader isolation to run multiple cassandra
instances in the same JVM, this makes it
* so some classes do not get shared (outside a blesssed set of
classes/packages). When the default behavior
diff --git
a/test/unit/org/apache/cassandra/transport/MessageDispatcherTest.java
b/test/unit/org/apache/cassandra/transport/MessageDispatcherTest.java
new file mode 100644
index 0000000000..0c70315e25
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/MessageDispatcherTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.channel.Channel;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.metrics.ClientMetrics;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.messages.AuthResponse;
+
+public class MessageDispatcherTest
+{
+ static final Message.Request AUTH_RESPONSE_REQUEST = new AuthResponse(new
byte[0])
+ {
+ public Response execute(QueryState queryState, long
queryStartNanoTime, boolean traceRequest)
+ {
+ return null;
+ }
+ };
+
+ private static AuthTestDispatcher dispatch;
+ private static int maxAuthThreadsBeforeTests;
+
+ @BeforeClass
+ public static void init() throws Exception
+ {
+ DatabaseDescriptor.daemonInitialization();
+ ClientMetrics.instance.init(Collections.emptyList());
+ maxAuthThreadsBeforeTests =
DatabaseDescriptor.getNativeTransportMaxAuthThreads();
+ dispatch = new AuthTestDispatcher();
+ }
+
+ @AfterClass
+ public static void restoreAuthSize()
+ {
+
DatabaseDescriptor.setNativeTransportMaxAuthThreads(maxAuthThreadsBeforeTests);
+ }
+
+ @Test
+ public void testAuthRateLimiter() throws Exception
+ {
+ long startRequests = completedRequests();
+
+ DatabaseDescriptor.setNativeTransportMaxAuthThreads(1);
+ long auths = tryAuth(this::completedAuth);
+ Assert.assertEquals(auths, 1);
+
+ DatabaseDescriptor.setNativeTransportMaxAuthThreads(100);
+ auths = tryAuth(this::completedAuth);
+ Assert.assertEquals(auths, 1);
+
+ // Make sure no tasks executed on the regular pool
+ Assert.assertEquals(startRequests, completedRequests());
+ }
+
+ @Test
+ public void testAuthRateLimiterNotUsed() throws Exception
+ {
+ DatabaseDescriptor.setNativeTransportMaxAuthThreads(1);
+ for (Message.Type type : Message.Type.values())
+ {
+ if (type == Message.Type.AUTH_RESPONSE || type ==
Message.Type.CREDENTIALS || type.direction != Message.Direction.REQUEST)
+ continue;
+
+ long auths = completedAuth();
+ long requests = tryAuth(this::completedRequests, new
Message.Request(type)
+ {
+ public Response execute(QueryState queryState, long
queryStartNanoTime, boolean traceRequest)
+ {
+ return null;
+ }
+ });
+ Assert.assertEquals(requests, 1);
+ Assert.assertEquals(completedAuth() - auths, 0);
+ }
+ }
+
+ @Test
+ public void testAuthRateLimiterDisabled() throws Exception
+ {
+ long startAuthRequests = completedAuth();
+
+ DatabaseDescriptor.setNativeTransportMaxAuthThreads(0);
+ long requests = tryAuth(this::completedRequests);
+ Assert.assertEquals(requests, 1);
+
+ DatabaseDescriptor.setNativeTransportMaxAuthThreads(-1);
+ requests = tryAuth(this::completedRequests);
+ Assert.assertEquals(requests, 1);
+
+ DatabaseDescriptor.setNativeTransportMaxAuthThreads(-1000);
+ requests = tryAuth(this::completedRequests);
+ Assert.assertEquals(requests, 1);
+
+ // Make sure no tasks executed on the auth pool
+ Assert.assertEquals(startAuthRequests, completedAuth());
+ }
+
+ private long completedRequests()
+ {
+ return Dispatcher.requestExecutor.getCompletedTaskCount();
+ }
+
+ private long completedAuth()
+ {
+ return Dispatcher.authExecutor.getCompletedTaskCount();
+ }
+
+ public long tryAuth(Callable<Long> check) throws Exception
+ {
+ return tryAuth(check, AUTH_RESPONSE_REQUEST);
+ }
+
+ @SuppressWarnings("UnstableApiUsage")
+ public long tryAuth(Callable<Long> check, Message.Request request) throws
Exception
+ {
+ long start = check.call();
+ dispatch.dispatch(null, request, (channel,req,response) -> null,
ClientResourceLimits.Overload.NONE);
+
+ // While this is timeout based, we should be *well below* a full
second on any of this processing in any sane environment.
+ long timeout = System.currentTimeMillis();
+ while(start == check.call() && System.currentTimeMillis() - timeout <
1000)
+ {
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+ }
+ return check.call() - start;
+ }
+
+ public static class AuthTestDispatcher extends Dispatcher
+ {
+ public AuthTestDispatcher()
+ {
+ super(false);
+ }
+
+ @Override
+ void processRequest(Channel channel,
+ Message.Request request,
+ FlushItemConverter forFlusher,
+ ClientResourceLimits.Overload backpressure,
+ long approxStartTimeNanos)
+ {
+ // noop
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]