This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6b34e18 CASSANDRASC-124 Fix wait time acquired in SidecarRateLimiter
(#115)
6b34e18 is described below
commit 6b34e1883e69c0c1a3f260000777cdb0cfe91625
Author: Saranya Krishnakumar <[email protected]>
AuthorDate: Tue Apr 30 20:02:55 2024 -0700
CASSANDRASC-124 Fix wait time acquired in SidecarRateLimiter (#115)
Patch by Saranya Krishnakumar; Reviewed by Yifan Cai, Francisco Guerrero
for CASSANDRASC-124
---
CHANGES.txt | 1 +
src/main/dist/conf/sidecar.yaml | 1 -
.../common/util/concurrent/SidecarRateLimiter.java | 76 +++++++++++++--------
.../sidecar/config/ThrottleConfiguration.java | 9 ++-
.../config/yaml/ThrottleConfigurationImpl.java | 27 +++-----
.../cassandra/sidecar/models/HttpResponse.java | 7 +-
.../cassandra/sidecar/utils/FileStreamer.java | 79 +++++++++++-----------
.../util/concurrent/SidecarRateLimiterTest.java | 44 +++++++++++-
.../org/apache/cassandra/sidecar/TestModule.java | 2 +-
.../org/apache/cassandra/sidecar/ThrottleTest.java | 71 +++++++++----------
.../sidecar/config/SidecarConfigurationTest.java | 1 -
.../resources/config/sidecar_file_permissions.yaml | 1 -
.../config/sidecar_invalid_client_auth.yaml | 1 -
.../config/sidecar_invalid_file_permissions.yaml | 1 -
src/test/resources/config/sidecar_metrics.yaml | 1 -
.../config/sidecar_metrics_empty_filters.yaml | 1 -
src/test/resources/config/sidecar_missing_jmx.yaml | 1 -
.../config/sidecar_missing_sstable_snapshot.yaml | 1 -
.../config/sidecar_multiple_instances.yaml | 1 -
.../config/sidecar_no_local_instances.yaml | 1 -
.../sidecar_schema_keyspace_configuration.yaml | 1 -
.../resources/config/sidecar_single_instance.yaml | 1 -
src/test/resources/config/sidecar_ssl.yaml | 1 -
.../config/sidecar_validation_configuration.yaml | 1 -
.../sidecar_with_single_multiple_instances.yaml | 1 -
25 files changed, 186 insertions(+), 146 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 7968525..b2a3fd9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * Fix wait time acquired in SidecarRateLimiter (CASSANDRASC-124)
* Make RestoreJobDiscoverer less verbose (CASSANDRASC-126)
* Import Queue pendingImports metrics is reporting an incorrect value
(CASSANDRASC-125)
* Add missing method to retrieve the InetSocketAddress to DriverUtils
(CASSANDRASC-123)
diff --git a/src/main/dist/conf/sidecar.yaml b/src/main/dist/conf/sidecar.yaml
index 734dcc4..31310c7 100644
--- a/src/main/dist/conf/sidecar.yaml
+++ b/src/main/dist/conf/sidecar.yaml
@@ -73,7 +73,6 @@ sidecar:
server_verticle_instances: 1
throttle:
stream_requests_per_sec: 5000
- delay_sec: 5
timeout_sec: 10
traffic_shaping:
inbound_global_bandwidth_bps: 0 # 0 implies unthrottled, the
inbound bandwidth in bytes per second
diff --git
a/src/main/java/com/google/common/util/concurrent/SidecarRateLimiter.java
b/src/main/java/com/google/common/util/concurrent/SidecarRateLimiter.java
index b5abc14..3b025bf 100644
--- a/src/main/java/com/google/common/util/concurrent/SidecarRateLimiter.java
+++ b/src/main/java/com/google/common/util/concurrent/SidecarRateLimiter.java
@@ -28,17 +28,16 @@ import
com.google.errorprone.annotations.CanIgnoreReturnValue;
* <p>
* In addition to Guava's Rate Limiter functionality, it adds support for
disabling rate-limiting.
*/
-@SuppressWarnings("UnstableApiUsage")
public class SidecarRateLimiter
{
- private final AtomicReference<RateLimiter> ref = new
AtomicReference<>(null);
+ private final AtomicReference<RateLimiterWrapper> ref = new
AtomicReference<>(null);
private SidecarRateLimiter(final double permitsPerSecond)
{
if (permitsPerSecond > 0)
{
- RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond);
- ref.set(rateLimiter);
+ RateLimiterWrapper rateLimiterWrapper =
RateLimiterWrapper.create(permitsPerSecond);
+ ref.set(rateLimiterWrapper);
}
}
@@ -55,21 +54,16 @@ public class SidecarRateLimiter
return new SidecarRateLimiter(permitsPerSecond);
}
- // Attention: Hack to expose the package private method
queryEarliestAvailable
-
/**
- * Returns the earliest time permits will become available. Returns 0 if
disabled.
- *
- * <br><b>Note:</b> this is a hack to expose the package private method
- * {@link RateLimiter#queryEarliestAvailable(long)}
+ * Returns calculated wait time in micros for next available permit.
Permit is not reserved during calculation,
+ * this wait time is an approximation.
*
- * @param nowMicros current time in micros
- * @return earliest time permits will become available
+ * @return approx wait time in micros for next available permit
*/
- public long queryEarliestAvailable(final long nowMicros)
+ public long queryWaitTimeInMicros()
{
- RateLimiter rateLimiter = ref.get();
- return rateLimiter != null ?
rateLimiter.queryEarliestAvailable(nowMicros) : 0;
+ RateLimiterWrapper rateLimiterWrapper = ref.get();
+ return rateLimiterWrapper != null ?
rateLimiterWrapper.queryWaitTimeInMicros() : 0;
}
/**
@@ -79,8 +73,8 @@ public class SidecarRateLimiter
*/
public boolean tryAcquire()
{
- RateLimiter rateLimiter = ref.get();
- return rateLimiter == null || rateLimiter.tryAcquire();
+ RateLimiterWrapper rateLimiterWrapper = ref.get();
+ return rateLimiterWrapper == null ||
rateLimiterWrapper.rateLimiter.tryAcquire();
}
/**
@@ -93,17 +87,17 @@ public class SidecarRateLimiter
*/
public void rate(double permitsPerSecond)
{
- RateLimiter rateLimiter = ref.get();
+ RateLimiterWrapper rateLimiterWrapper = ref.get();
if (permitsPerSecond > 0.0)
{
- if (rateLimiter == null)
+ if (rateLimiterWrapper == null)
{
- ref.compareAndSet(null, RateLimiter.create(permitsPerSecond));
+ ref.compareAndSet(null,
RateLimiterWrapper.create(permitsPerSecond));
}
else
{
- rateLimiter.setRate(permitsPerSecond);
+ rateLimiterWrapper.rateLimiter.setRate(permitsPerSecond);
}
}
else
@@ -123,8 +117,8 @@ public class SidecarRateLimiter
*/
public double rate()
{
- RateLimiter rateLimiter = ref.get();
- return rateLimiter != null ? rateLimiter.getRate() : 0;
+ RateLimiterWrapper rateLimiterWrapper = ref.get();
+ return rateLimiterWrapper != null ?
rateLimiterWrapper.rateLimiter.getRate() : 0;
}
/**
@@ -139,8 +133,8 @@ public class SidecarRateLimiter
@CanIgnoreReturnValue
public double acquire()
{
- RateLimiter rateLimiter = ref.get();
- return rateLimiter != null ? rateLimiter.acquire() : 0;
+ RateLimiterWrapper rateLimiterWrapper = ref.get();
+ return rateLimiterWrapper != null ?
rateLimiterWrapper.rateLimiter.acquire() : 0;
}
/**
@@ -156,7 +150,35 @@ public class SidecarRateLimiter
@CanIgnoreReturnValue
public double acquire(int permits)
{
- RateLimiter rateLimiter = ref.get();
- return rateLimiter != null && permits > 0 ?
rateLimiter.acquire(permits) : 0;
+ RateLimiterWrapper rateLimiterWrapper = ref.get();
+ return rateLimiterWrapper != null && permits > 0 ?
rateLimiterWrapper.rateLimiter.acquire(permits) : 0;
+ }
+
+ // Attention: Hack to expose the package private method
queryEarliestAvailable and RateLimiter.SleepingStopwatch
+ @SuppressWarnings("UnstableApiUsage")
+ private static class RateLimiterWrapper
+ {
+ private final RateLimiter rateLimiter;
+ private final RateLimiter.SleepingStopwatch stopwatch;
+
+ private RateLimiterWrapper(RateLimiter rateLimiter,
RateLimiter.SleepingStopwatch stopwatch)
+ {
+ this.rateLimiter = rateLimiter;
+ this.stopwatch = stopwatch;
+ }
+
+ static RateLimiterWrapper create(double permitsPerSecond)
+ {
+ RateLimiter.SleepingStopwatch stopwatch =
RateLimiter.SleepingStopwatch.createFromSystemTimer();
+ RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond,
stopwatch);
+ return new RateLimiterWrapper(rateLimiter, stopwatch);
+ }
+
+ public long queryWaitTimeInMicros()
+ {
+ long earliestAvailableMicros =
rateLimiter.queryEarliestAvailable(0);
+ long nowMicros = stopwatch.readMicros();
+ return Math.max(earliestAvailableMicros - nowMicros, 0);
+ }
}
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/config/ThrottleConfiguration.java
b/src/main/java/org/apache/cassandra/sidecar/config/ThrottleConfiguration.java
index e4c8af7..0447d35 100644
---
a/src/main/java/org/apache/cassandra/sidecar/config/ThrottleConfiguration.java
+++
b/src/main/java/org/apache/cassandra/sidecar/config/ThrottleConfiguration.java
@@ -23,9 +23,14 @@ package org.apache.cassandra.sidecar.config;
*/
public interface ThrottleConfiguration
{
+ /**
+ * @return rate at which stream requests will be accepted.
+ */
long rateLimitStreamRequestsPerSecond();
+ /**
+ * @return timeout in seconds used to determine when to stop retrying
stream requests when stream requests
+ * are throttled.
+ */
long timeoutInSeconds();
-
- long delayInSeconds();
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/ThrottleConfigurationImpl.java
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/ThrottleConfigurationImpl.java
index 24f45ce..b755c95 100644
---
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/ThrottleConfigurationImpl.java
+++
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/ThrottleConfigurationImpl.java
@@ -28,41 +28,36 @@ public class ThrottleConfigurationImpl implements
ThrottleConfiguration
{
public static final long DEFAULT_STREAM_REQUESTS_PER_SEC = 5000;
public static final long DEFAULT_TIMEOUT_SEC = 10;
- public static final long DEFAULT_DELAY_SEC = 5;
public static final String STREAM_REQUESTS_PER_SEC_PROPERTY =
"stream_requests_per_sec";
public static final String TIMEOUT_SEC_PROPERTY = "timeout_sec";
- public static final String DELAY_SEC_PROPERTY = "delay_sec";
@JsonProperty(value = STREAM_REQUESTS_PER_SEC_PROPERTY)
protected final long rateLimitStreamRequestsPerSecond;
@JsonProperty(value = TIMEOUT_SEC_PROPERTY)
protected final long timeoutInSeconds;
- @JsonProperty(value = DELAY_SEC_PROPERTY)
- protected final long delayInSeconds;
public ThrottleConfigurationImpl()
{
this(DEFAULT_STREAM_REQUESTS_PER_SEC,
- DEFAULT_TIMEOUT_SEC,
- DEFAULT_DELAY_SEC);
+ DEFAULT_TIMEOUT_SEC);
}
public ThrottleConfigurationImpl(long rateLimitStreamRequestsPerSecond)
{
this(rateLimitStreamRequestsPerSecond,
- DEFAULT_TIMEOUT_SEC,
- DEFAULT_DELAY_SEC);
+ DEFAULT_TIMEOUT_SEC);
}
public ThrottleConfigurationImpl(long rateLimitStreamRequestsPerSecond,
- long timeoutInSeconds,
- long delayInSeconds)
+ long timeoutInSeconds)
{
this.rateLimitStreamRequestsPerSecond =
rateLimitStreamRequestsPerSecond;
this.timeoutInSeconds = timeoutInSeconds;
- this.delayInSeconds = delayInSeconds;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
@JsonProperty(value = STREAM_REQUESTS_PER_SEC_PROPERTY)
public long rateLimitStreamRequestsPerSecond()
@@ -70,17 +65,13 @@ public class ThrottleConfigurationImpl implements
ThrottleConfiguration
return rateLimitStreamRequestsPerSecond;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
@JsonProperty(value = TIMEOUT_SEC_PROPERTY)
public long timeoutInSeconds()
{
return timeoutInSeconds;
}
-
- @Override
- @JsonProperty(value = DELAY_SEC_PROPERTY)
- public long delayInSeconds()
- {
- return delayInSeconds;
- }
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java
b/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java
index 9b798f6..960d116 100644
--- a/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java
+++ b/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java
@@ -28,7 +28,7 @@ import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.net.SocketAddress;
import org.apache.cassandra.sidecar.common.utils.HttpRange;
-import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
* Wrapper around HttpServerResponse
@@ -46,10 +46,11 @@ public class HttpResponse
this.host = request.host();
}
- public void setRetryAfterHeader(long microsToWait)
+ public void setRetryAfterHeader(long waitTimeNanos)
{
response.setStatusCode(HttpResponseStatus.TOO_MANY_REQUESTS.code());
- response.putHeader(HttpHeaderNames.RETRY_AFTER,
Long.toString(MICROSECONDS.toSeconds(microsToWait))).end();
+ // round up when converting to second value
+ response.putHeader(HttpHeaderNames.RETRY_AFTER,
Long.toString(NANOSECONDS.toSeconds(waitTimeNanos) + 1)).end();
}
public void setTooManyRequestsStatus()
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
b/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
index eb1b129..1825863 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.sidecar.utils;
-import java.time.Duration;
-import java.time.Instant;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.SidecarRateLimiter;
@@ -103,7 +101,14 @@ public class FileStreamer
public Future<Void> stream(HttpResponse response, int instanceId, String
filename, long fileLength, HttpRange range)
{
Promise<Void> promise = Promise.promise();
- acquireAndSend(response, instanceId, filename, fileLength, range,
Instant.now(), promise);
+ try
+ {
+ acquireAndSend(response, instanceId, filename, fileLength, range,
System.nanoTime(), promise);
+ }
+ catch (Throwable t)
+ {
+ promise.tryFail(t);
+ }
return promise.future();
}
@@ -111,25 +116,25 @@ public class FileStreamer
* Send the file if rate-limiting is disabled or when it successfully
acquires a permit from the
* {@link SidecarRateLimiter}.
*
- * @param response the response to use
- * @param instanceId Cassandra instance from which file is streamed
- * @param filename the path to the file to serve
- * @param fileLength the size of the file to serve
- * @param range the range to stream
- * @param startTime the start time of this request
- * @param promise a promise for the stream
+ * @param response the response to use
+ * @param instanceId Cassandra instance from which file is streamed
+ * @param filename the path to the file to serve
+ * @param fileLength the size of the file to serve
+ * @param range the range to stream
+ * @param startTimeNanos the start time of this request
+ * @param promise a promise for the stream
*/
private void acquireAndSend(HttpResponse response,
int instanceId,
String filename,
long fileLength,
HttpRange range,
- Instant startTime,
+ long startTimeNanos,
Promise<Void> promise)
{
InstanceMetrics instanceMetrics =
instanceMetadataFetcher.instance(instanceId).metrics();
StreamSSTableMetrics streamSSTableMetrics =
instanceMetrics.streamSSTable();
- if (acquire(response, instanceId, filename, fileLength, range,
startTime, streamSSTableMetrics, promise))
+ if (acquire(response, instanceId, filename, fileLength, range,
startTimeNanos, streamSSTableMetrics, promise))
{
// Stream data if rate limiting is disabled or if we acquire
LOGGER.debug("Streaming range {} for file {} to client {}.
Instance: {}", range, filename,
@@ -158,55 +163,53 @@ public class FileStreamer
* @param filename the path to the file to serve
* @param fileLength the size of the file to serve
* @param range the range to stream
- * @param startTime the start time of this request
+ * @param startTimeNanos the start time of this request
* @param streamSSTableMetrics metrics captured during streaming of
SSTables
* @param promise a promise for the stream
* @return {@code true} if the permit was acquired, {@code false} otherwise
*/
private boolean acquire(HttpResponse response, int instanceId, String
filename, long fileLength, HttpRange range,
- Instant startTime, StreamSSTableMetrics
streamSSTableMetrics, Promise<Void> promise)
+ long startTimeNanos, StreamSSTableMetrics
streamSSTableMetrics, Promise<Void> promise)
{
if (rateLimiter.tryAcquire())
return true;
- long microsToWait;
- if (checkRetriesExhausted(startTime))
+ long waitTimeNanos =
MICROSECONDS.toNanos(rateLimiter.queryWaitTimeInMicros());
+ if (isTimeoutExceeded(startTimeNanos, waitTimeNanos))
{
- LOGGER.error("Retries for acquiring permit exhausted for client
{}. Instance: {}", response.remoteAddress(),
- response.host());
+ LOGGER.warn("Retries for acquiring permit exhausted for client {}.
Instance: {}. " +
+ "Asking client to retry after {} nanoseconds.",
response.remoteAddress(), response.host(),
+ waitTimeNanos);
+ response.setRetryAfterHeader(waitTimeNanos);
+ streamSSTableMetrics.throttled.metric.update(1);
promise.fail(new HttpException(TOO_MANY_REQUESTS.code(), "Retry
exhausted"));
}
- else if ((microsToWait = rateLimiter.queryEarliestAvailable(0L))
- < TimeUnit.SECONDS.toMicros(config.delayInSeconds()))
+ else
{
- microsToWait = Math.max(0, microsToWait);
-
- LOGGER.debug("Retrying streaming after {} micros for client {}.
Instance: {}", microsToWait,
+ LOGGER.debug("Retrying streaming after {} nanos for client {}.
Instance: {}", waitTimeNanos,
response.remoteAddress(), response.host());
executorPools.service()
- .setTimer(MICROSECONDS.toMillis(microsToWait),
+ // Note: adding an extra millisecond is required for
2 reasons
+ // 1. setTimer does not like scheduling with 0 delay;
it throws
+ // 2. the retry should be scheduled later than the
waitTimeNanos, in order to ensure it can acquire
+
.setTimer(TimeUnit.NANOSECONDS.toMillis(waitTimeNanos) + 1,
t -> acquireAndSend(response, instanceId,
filename, fileLength, range,
- startTime, promise));
- }
- else
- {
- LOGGER.debug("Asking client {} to retry after {} micros. Instance:
{}", response.remoteAddress(),
- microsToWait, response.host());
- response.setRetryAfterHeader(microsToWait);
- streamSSTableMetrics.throttled.metric.update(1);
- promise.fail(new HttpException(TOO_MANY_REQUESTS.code(), "Ask
client to retry later"));
+ startTimeNanos,
promise));
}
return false;
}
/**
- * @param startTime the request start time
- * @return true if we exhausted the retries, false otherwise
+ * @param startTimeNanos the request start time in nanoseconds
+ * @param waitTimeNanos the estimated time to wait for the next available
permit in nanoseconds
+ *
+ * @return true if we exceeded timeout, false otherwise
+ * Note: for this check we take wait time for the request into
consideration
*/
- private boolean checkRetriesExhausted(Instant startTime)
+ private boolean isTimeoutExceeded(long startTimeNanos, long waitTimeNanos)
{
- return startTime.plus(Duration.ofSeconds(config.timeoutInSeconds()))
- .isBefore(Instant.now());
+ long nowNanos = System.nanoTime();
+ return startTimeNanos + waitTimeNanos +
TimeUnit.SECONDS.toNanos(config.timeoutInSeconds()) < nowNanos;
}
/**
diff --git
a/src/test/java/com/google/common/util/concurrent/SidecarRateLimiterTest.java
b/src/test/java/com/google/common/util/concurrent/SidecarRateLimiterTest.java
index afeb0a0..9ff27ea 100644
---
a/src/test/java/com/google/common/util/concurrent/SidecarRateLimiterTest.java
+++
b/src/test/java/com/google/common/util/concurrent/SidecarRateLimiterTest.java
@@ -18,6 +18,8 @@
package com.google.common.util.concurrent;
+import java.util.concurrent.TimeUnit;
+
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -37,13 +39,14 @@ class SidecarRateLimiterTest
assertThat(enabledRateLimiter.tryAcquire()).isTrue();
enabledRateLimiter.rate(150);
assertThat(enabledRateLimiter.rate()).isEqualTo(150);
-
assertThat(enabledRateLimiter.queryEarliestAvailable(0)).isGreaterThan(0);
+ enabledRateLimiter.acquire(200);
+
assertThat(enabledRateLimiter.queryWaitTimeInMicros()).isGreaterThan(0);
// Creates a SidecarRateLimiter that is disabled
SidecarRateLimiter disabledRateLimiter = SidecarRateLimiter.create(-1);
assertThat(disabledRateLimiter).isNotNull();
assertThat(disabledRateLimiter.rate()).isEqualTo(0);
-
assertThat(disabledRateLimiter.queryEarliestAvailable(1000L)).isEqualTo(0);
+ assertThat(disabledRateLimiter.queryWaitTimeInMicros()).isEqualTo(0);
}
@Test
@@ -83,4 +86,41 @@ class SidecarRateLimiterTest
assertThat(rateLimiter.acquire(5)).isEqualTo(0);
assertThat(rateLimiter.acquire(500)).isNotEqualTo(0);
}
+
+ @Test
+ void testWaitTimeReturned()
+ {
+ SidecarRateLimiter rateLimiter = SidecarRateLimiter.create(10);
+
+ rateLimiter.acquire(10);
+
assertThat(rateLimiter.queryWaitTimeInMicros()).isGreaterThanOrEqualTo(500000);
+
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+ rateLimiter.acquire(100);
+
assertThat(rateLimiter.queryWaitTimeInMicros()).isGreaterThanOrEqualTo(8000000);
+ }
+
+ @Test
+ void testClockResetWithRateUpdate()
+ {
+ SidecarRateLimiter rateLimiter = SidecarRateLimiter.create(-1);
+ rateLimiter.acquire(2000);
+ assertThat(rateLimiter.queryWaitTimeInMicros()).isEqualTo(0);
+
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+
+ rateLimiter.rate(1);
+ rateLimiter.acquire(4);
+
assertThat(rateLimiter.queryWaitTimeInMicros()).isGreaterThanOrEqualTo(3000000);
+ }
+
+ @Test
+ void testExhaustingPermits()
+ {
+ SidecarRateLimiter rateLimiter = SidecarRateLimiter.create(1);
+ rateLimiter.acquire(20);
+
+
assertThat(rateLimiter.queryWaitTimeInMicros()).isGreaterThanOrEqualTo(TimeUnit.SECONDS.toMicros(10));
+ }
}
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index 49dbef3..afb94f7 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -104,7 +104,7 @@ public class TestModule extends AbstractModule
protected SidecarConfigurationImpl abstractConfig(SslConfiguration
sslConfiguration)
{
- ThrottleConfiguration throttleConfiguration = new
ThrottleConfigurationImpl(1, 10, 5);
+ ThrottleConfiguration throttleConfiguration = new
ThrottleConfigurationImpl(5, 5);
SSTableUploadConfiguration uploadConfiguration = new
SSTableUploadConfigurationImpl(0F);
SchemaKeyspaceConfiguration schemaKeyspaceConfiguration =
SchemaKeyspaceConfigurationImpl.builder()
diff --git a/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java
b/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java
index 9627d37..f45deb4 100644
--- a/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java
@@ -18,9 +18,10 @@
package org.apache.cassandra.sidecar;
-import java.util.concurrent.CompletableFuture;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -29,11 +30,14 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.codahale.metrics.SharedMetricRegistries;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.util.Modules;
import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Future;
import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.codec.BodyCodec;
@@ -43,6 +47,7 @@ import
org.apache.cassandra.sidecar.metrics.instance.InstanceMetricsImpl;
import org.apache.cassandra.sidecar.metrics.instance.StreamSSTableMetrics;
import org.apache.cassandra.sidecar.server.MainModule;
import org.apache.cassandra.sidecar.server.Server;
+import org.assertj.core.data.Offset;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry;
@@ -76,8 +81,7 @@ public class ThrottleTest
void tearDown() throws InterruptedException
{
CountDownLatch closeLatch = new CountDownLatch(1);
- registry().removeMatching((name, metric) -> true);
- registry(1).removeMatching((name, metric) -> true);
+ SharedMetricRegistries.clear();
server.close().onSuccess(res -> closeLatch.countDown());
if (closeLatch.await(60, SECONDS))
logger.info("Close event received before timeout.");
@@ -86,48 +90,37 @@ public class ThrottleTest
}
@Test
- void testStreamRequestsThrottled() throws Exception
+ void testStreamRequestsThrottled(VertxTestContext context) throws Exception
{
String testRoute =
"/keyspaces/TestKeyspace/tables/TestTable-54ea95cebba24e0aa9bee428e5d7160b/snapshots"
+
"/TestSnapshot/components/nb-1-big-Data.db?dataDirectoryIndex=0";
- for (int i = 0; i < 20; i++)
- {
- unblockingClientRequest(testRoute);
- }
-
- StreamSSTableMetrics streamSSTableMetrics = new
InstanceMetricsImpl(registry(1)).streamSSTable();
-
- HttpResponse response = blockingClientRequest(testRoute);
-
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.TOO_MANY_REQUESTS.code());
-
assertThat(streamSSTableMetrics.throttled.metric.getValue()).isGreaterThanOrEqualTo(1);
-
- long secsToWait = Long.parseLong(response.getHeader("Retry-After"));
- Thread.sleep(SECONDS.toMillis(secsToWait));
-
- HttpResponse finalResp = blockingClientRequest(testRoute);
-
assertThat(finalResp.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
- assertThat(finalResp.bodyAsString()).isEqualTo("data");
- }
-
- private void unblockingClientRequest(String route)
- {
- WebClient client = WebClient.create(vertx);
- client.get(server.actualPort(), "localhost", "/api/v1" + route)
- .as(BodyCodec.buffer())
- .send(resp ->
- {
- // do nothing
- });
+ long startTime = System.nanoTime();
+ List<Future<HttpResponse<Buffer>>> responseFutures =
IntStream.range(0, 50)
+
.mapToObj(i -> sendRequest(testRoute))
+
.collect(Collectors.toList());
+
+ Future.join(responseFutures)
+ .onComplete(context.succeeding(combinedResp -> {
+ long elapsedNanos = System.nanoTime() - startTime;
+ long okResponse = responseFutures.stream()
+ .filter(resp ->
resp.result() != null && resp.result().statusCode() ==
HttpResponseStatus.OK.code())
+ .count();
+ double rate = okResponse * SECONDS.toNanos(1) / (double)
elapsedNanos;
+ assertThat(rate).as("Rate is expected to be 5 requests per
second. rate=" + rate + " RPS")
+ .isEqualTo(5D, Offset.offset(2D));
+ StreamSSTableMetrics streamSSTableMetrics = new
InstanceMetricsImpl(registry(1)).streamSSTable();
+
assertThat(streamSSTableMetrics.throttled.metric.getValue()).isGreaterThanOrEqualTo(5);
+ context.completeNow();
+ }));
+ context.awaitCompletion(30, SECONDS);
}
- private HttpResponse blockingClientRequest(String route) throws
ExecutionException, InterruptedException
+ private Future<HttpResponse<Buffer>> sendRequest(String route)
{
WebClient client = WebClient.create(vertx);
- CompletableFuture<HttpResponse> future = new CompletableFuture<>();
- client.get(server.actualPort(), "localhost", "/api/v1" + route)
- .as(BodyCodec.buffer())
- .send(resp -> future.complete(resp.result()));
- return future.get();
+ return client.get(server.actualPort(), "localhost", "/api/v1" + route)
+ .as(BodyCodec.buffer())
+ .send();
}
}
diff --git
a/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
b/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
index ba8940f..ad8daae 100644
---
a/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
@@ -394,7 +394,6 @@ class SidecarConfigurationTest
assertThat(throttle).isNotNull();
assertThat(throttle.rateLimitStreamRequestsPerSecond()).isEqualTo(5000);
- assertThat(throttle.delayInSeconds()).isEqualTo(5);
assertThat(throttle.timeoutInSeconds()).isEqualTo(10);
// validate traffic shaping options
diff --git a/src/test/resources/config/sidecar_file_permissions.yaml
b/src/test/resources/config/sidecar_file_permissions.yaml
index d1aeabe..7ab200e 100644
--- a/src/test/resources/config/sidecar_file_permissions.yaml
+++ b/src/test/resources/config/sidecar_file_permissions.yaml
@@ -70,7 +70,6 @@ sidecar:
request_timeout_millis: 300000
throttle:
stream_requests_per_sec: 5000
- delay_sec: 5
timeout_sec: 10
sstable_upload:
concurrent_upload_limit: 80
diff --git a/src/test/resources/config/sidecar_invalid_client_auth.yaml
b/src/test/resources/config/sidecar_invalid_client_auth.yaml
index 5ca9dfa..6bb88e2 100644
--- a/src/test/resources/config/sidecar_invalid_client_auth.yaml
+++ b/src/test/resources/config/sidecar_invalid_client_auth.yaml
@@ -72,7 +72,6 @@ sidecar:
accept_backlog: 1024
throttle:
stream_requests_per_sec: 5000
- delay_sec: 5
timeout_sec: 10
sstable_upload:
concurrent_upload_limit: 80
diff --git a/src/test/resources/config/sidecar_invalid_file_permissions.yaml
b/src/test/resources/config/sidecar_invalid_file_permissions.yaml
index ce192ce..81928a8 100644
--- a/src/test/resources/config/sidecar_invalid_file_permissions.yaml
+++ b/src/test/resources/config/sidecar_invalid_file_permissions.yaml
@@ -70,7 +70,6 @@ sidecar:
request_timeout_millis: 300000
throttle:
stream_requests_per_sec: 5000
- delay_sec: 5
timeout_sec: 10
sstable_upload:
concurrent_upload_limit: 80
diff --git a/src/test/resources/config/sidecar_metrics.yaml
b/src/test/resources/config/sidecar_metrics.yaml
index fb7636c..851e6b5 100644
--- a/src/test/resources/config/sidecar_metrics.yaml
+++ b/src/test/resources/config/sidecar_metrics.yaml
@@ -39,7 +39,6 @@ sidecar:
port: 0
throttle:
stream_requests_per_sec: 5000
- delay_sec: 5
timeout_sec: 10
allowable_time_skew_in_minutes: 60
jmx:
diff --git a/src/test/resources/config/sidecar_metrics_empty_filters.yaml
b/src/test/resources/config/sidecar_metrics_empty_filters.yaml
index d2af3b7..6aaa548 100644
--- a/src/test/resources/config/sidecar_metrics_empty_filters.yaml
+++ b/src/test/resources/config/sidecar_metrics_empty_filters.yaml
@@ -39,7 +39,6 @@ sidecar:
port: 0
throttle:
stream_requests_per_sec: 5000
- delay_sec: 5
timeout_sec: 10
allowable_time_skew_in_minutes: 60
jmx:
diff --git a/src/test/resources/config/sidecar_missing_jmx.yaml
b/src/test/resources/config/sidecar_missing_jmx.yaml
index 6e8a6db..b9193dd 100644
--- a/src/test/resources/config/sidecar_missing_jmx.yaml
+++ b/src/test/resources/config/sidecar_missing_jmx.yaml
@@ -73,7 +73,6 @@ sidecar:
server_verticle_instances: 1
throttle:
stream_requests_per_sec: 5000
- delay_sec: 5
timeout_sec: 10
traffic_shaping:
inbound_global_bandwidth_bps: 500
diff --git a/src/test/resources/config/sidecar_missing_sstable_snapshot.yaml
b/src/test/resources/config/sidecar_missing_sstable_snapshot.yaml
index 5269370..7d9c6d5 100644
--- a/src/test/resources/config/sidecar_missing_sstable_snapshot.yaml
+++ b/src/test/resources/config/sidecar_missing_sstable_snapshot.yaml
@@ -73,7 +73,6 @@ sidecar:
server_verticle_instances: 1
throttle:
stream_requests_per_sec: 5000
- delay_sec: 5
timeout_sec: 10
traffic_shaping:
inbound_global_bandwidth_bps: 500
diff --git a/src/test/resources/config/sidecar_multiple_instances.yaml
b/src/test/resources/config/sidecar_multiple_instances.yaml
index 9a43710..ffb1dcc 100644
--- a/src/test/resources/config/sidecar_multiple_instances.yaml
+++ b/src/test/resources/config/sidecar_multiple_instances.yaml
@@ -73,7 +73,6 @@ sidecar:
server_verticle_instances: 1
throttle:
stream_requests_per_sec: 5000
- delay_sec: 5
timeout_sec: 10
traffic_shaping:
inbound_global_bandwidth_bps: 500
diff --git a/src/test/resources/config/sidecar_no_local_instances.yaml
b/src/test/resources/config/sidecar_no_local_instances.yaml
index 8c70e6d..83f0bd5 100644
--- a/src/test/resources/config/sidecar_no_local_instances.yaml
+++ b/src/test/resources/config/sidecar_no_local_instances.yaml
@@ -13,7 +13,6 @@ sidecar:
server_verticle_instances: 2
throttle:
stream_requests_per_sec: 5000
- delay_sec: 5
timeout_sec: 10
traffic_shaping:
inbound_global_bandwidth_bps: 500
diff --git
a/src/test/resources/config/sidecar_schema_keyspace_configuration.yaml
b/src/test/resources/config/sidecar_schema_keyspace_configuration.yaml
index a1f45c3..443958e 100644
--- a/src/test/resources/config/sidecar_schema_keyspace_configuration.yaml
+++ b/src/test/resources/config/sidecar_schema_keyspace_configuration.yaml
@@ -26,7 +26,6 @@ sidecar:
server_verticle_instances: 2
throttle:
stream_requests_per_sec: 5000
- delay_sec: 5
timeout_sec: 10
traffic_shaping:
inbound_global_bandwidth_bps: 500
diff --git a/src/test/resources/config/sidecar_single_instance.yaml
b/src/test/resources/config/sidecar_single_instance.yaml
index f47cce3..257865e 100644
--- a/src/test/resources/config/sidecar_single_instance.yaml
+++ b/src/test/resources/config/sidecar_single_instance.yaml
@@ -26,7 +26,6 @@ sidecar:
server_verticle_instances: 2
throttle:
stream_requests_per_sec: 5000
- delay_sec: 5
timeout_sec: 10
traffic_shaping:
inbound_global_bandwidth_bps: 500
diff --git a/src/test/resources/config/sidecar_ssl.yaml
b/src/test/resources/config/sidecar_ssl.yaml
index c61b008..b2c8cb8 100644
--- a/src/test/resources/config/sidecar_ssl.yaml
+++ b/src/test/resources/config/sidecar_ssl.yaml
@@ -73,7 +73,6 @@ sidecar:
server_verticle_instances: 1
throttle:
stream_requests_per_sec: 5000
- delay_sec: 5
timeout_sec: 10
traffic_shaping:
inbound_global_bandwidth_bps: 500
diff --git a/src/test/resources/config/sidecar_validation_configuration.yaml
b/src/test/resources/config/sidecar_validation_configuration.yaml
index 2d6364f..40ba742 100644
--- a/src/test/resources/config/sidecar_validation_configuration.yaml
+++ b/src/test/resources/config/sidecar_validation_configuration.yaml
@@ -18,7 +18,6 @@ sidecar:
request_timeout_millis: 1200000
throttle:
stream_requests_per_sec: 80
- delay_sec: 7
timeout_sec: 21
allowable_time_skew_in_minutes: 89
sstable_import:
diff --git
a/src/test/resources/config/sidecar_with_single_multiple_instances.yaml
b/src/test/resources/config/sidecar_with_single_multiple_instances.yaml
index a207884..40f1375 100644
--- a/src/test/resources/config/sidecar_with_single_multiple_instances.yaml
+++ b/src/test/resources/config/sidecar_with_single_multiple_instances.yaml
@@ -36,7 +36,6 @@ sidecar:
port: 9043
throttle:
stream_requests_per_sec: 5000
- delay_sec: 5
timeout_sec: 10
allowable_time_skew_in_minutes: 60
jmx:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]