This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 6b34e18 CASSANDRASC-124 Fix wait time acquired in SidecarRateLimiter (#115) 6b34e18 is described below commit 6b34e1883e69c0c1a3f260000777cdb0cfe91625 Author: Saranya Krishnakumar <sarany...@apple.com> AuthorDate: Tue Apr 30 20:02:55 2024 -0700 CASSANDRASC-124 Fix wait time acquired in SidecarRateLimiter (#115) Patch by Saranya Krishnakumar; Reviewed by Yifan Cai, Francisco Guerrero for CASSANDRASC-124 --- CHANGES.txt | 1 + src/main/dist/conf/sidecar.yaml | 1 - .../common/util/concurrent/SidecarRateLimiter.java | 76 +++++++++++++-------- .../sidecar/config/ThrottleConfiguration.java | 9 ++- .../config/yaml/ThrottleConfigurationImpl.java | 27 +++----- .../cassandra/sidecar/models/HttpResponse.java | 7 +- .../cassandra/sidecar/utils/FileStreamer.java | 79 +++++++++++----------- .../util/concurrent/SidecarRateLimiterTest.java | 44 +++++++++++- .../org/apache/cassandra/sidecar/TestModule.java | 2 +- .../org/apache/cassandra/sidecar/ThrottleTest.java | 71 +++++++++---------- .../sidecar/config/SidecarConfigurationTest.java | 1 - .../resources/config/sidecar_file_permissions.yaml | 1 - .../config/sidecar_invalid_client_auth.yaml | 1 - .../config/sidecar_invalid_file_permissions.yaml | 1 - src/test/resources/config/sidecar_metrics.yaml | 1 - .../config/sidecar_metrics_empty_filters.yaml | 1 - src/test/resources/config/sidecar_missing_jmx.yaml | 1 - .../config/sidecar_missing_sstable_snapshot.yaml | 1 - .../config/sidecar_multiple_instances.yaml | 1 - .../config/sidecar_no_local_instances.yaml | 1 - .../sidecar_schema_keyspace_configuration.yaml | 1 - .../resources/config/sidecar_single_instance.yaml | 1 - src/test/resources/config/sidecar_ssl.yaml | 1 - .../config/sidecar_validation_configuration.yaml | 1 - .../sidecar_with_single_multiple_instances.yaml | 1 - 25 files changed, 186 insertions(+), 146 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 7968525..b2a3fd9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Fix wait time acquired in SidecarRateLimiter (CASSANDRASC-124) * Make RestoreJobDiscoverer less verbose (CASSANDRASC-126) * Import Queue pendingImports metrics is reporting an incorrect value (CASSANDRASC-125) * Add missing method to retrieve the InetSocketAddress to DriverUtils (CASSANDRASC-123) diff --git a/src/main/dist/conf/sidecar.yaml b/src/main/dist/conf/sidecar.yaml index 734dcc4..31310c7 100644 --- a/src/main/dist/conf/sidecar.yaml +++ b/src/main/dist/conf/sidecar.yaml @@ -73,7 +73,6 @@ sidecar: server_verticle_instances: 1 throttle: stream_requests_per_sec: 5000 - delay_sec: 5 timeout_sec: 10 traffic_shaping: inbound_global_bandwidth_bps: 0 # 0 implies unthrottled, the inbound bandwidth in bytes per second diff --git a/src/main/java/com/google/common/util/concurrent/SidecarRateLimiter.java b/src/main/java/com/google/common/util/concurrent/SidecarRateLimiter.java index b5abc14..3b025bf 100644 --- a/src/main/java/com/google/common/util/concurrent/SidecarRateLimiter.java +++ b/src/main/java/com/google/common/util/concurrent/SidecarRateLimiter.java @@ -28,17 +28,16 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; * <p> * In addition to Guava's Rate Limiter functionality, it adds support for disabling rate-limiting. */ -@SuppressWarnings("UnstableApiUsage") public class SidecarRateLimiter { - private final AtomicReference<RateLimiter> ref = new AtomicReference<>(null); + private final AtomicReference<RateLimiterWrapper> ref = new AtomicReference<>(null); private SidecarRateLimiter(final double permitsPerSecond) { if (permitsPerSecond > 0) { - RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond); - ref.set(rateLimiter); + RateLimiterWrapper rateLimiterWrapper = RateLimiterWrapper.create(permitsPerSecond); + ref.set(rateLimiterWrapper); } } @@ -55,21 +54,16 @@ public class SidecarRateLimiter return new SidecarRateLimiter(permitsPerSecond); } - // Attention: Hack to expose the package private method queryEarliestAvailable - /** - * Returns the earliest time permits will become available. Returns 0 if disabled. - * - * <br><b>Note:</b> this is a hack to expose the package private method - * {@link RateLimiter#queryEarliestAvailable(long)} + * Returns calculated wait time in micros for next available permit. Permit is not reserved during calculation, + * this wait time is an approximation. * - * @param nowMicros current time in micros - * @return earliest time permits will become available + * @return approx wait time in micros for next available permit */ - public long queryEarliestAvailable(final long nowMicros) + public long queryWaitTimeInMicros() { - RateLimiter rateLimiter = ref.get(); - return rateLimiter != null ? rateLimiter.queryEarliestAvailable(nowMicros) : 0; + RateLimiterWrapper rateLimiterWrapper = ref.get(); + return rateLimiterWrapper != null ? rateLimiterWrapper.queryWaitTimeInMicros() : 0; } /** @@ -79,8 +73,8 @@ public class SidecarRateLimiter */ public boolean tryAcquire() { - RateLimiter rateLimiter = ref.get(); - return rateLimiter == null || rateLimiter.tryAcquire(); + RateLimiterWrapper rateLimiterWrapper = ref.get(); + return rateLimiterWrapper == null || rateLimiterWrapper.rateLimiter.tryAcquire(); } /** @@ -93,17 +87,17 @@ public class SidecarRateLimiter */ public void rate(double permitsPerSecond) { - RateLimiter rateLimiter = ref.get(); + RateLimiterWrapper rateLimiterWrapper = ref.get(); if (permitsPerSecond > 0.0) { - if (rateLimiter == null) + if (rateLimiterWrapper == null) { - ref.compareAndSet(null, RateLimiter.create(permitsPerSecond)); + ref.compareAndSet(null, RateLimiterWrapper.create(permitsPerSecond)); } else { - rateLimiter.setRate(permitsPerSecond); + rateLimiterWrapper.rateLimiter.setRate(permitsPerSecond); } } else @@ -123,8 +117,8 @@ public class SidecarRateLimiter */ public double rate() { - RateLimiter rateLimiter = ref.get(); - return rateLimiter != null ? rateLimiter.getRate() : 0; + RateLimiterWrapper rateLimiterWrapper = ref.get(); + return rateLimiterWrapper != null ? rateLimiterWrapper.rateLimiter.getRate() : 0; } /** @@ -139,8 +133,8 @@ public class SidecarRateLimiter @CanIgnoreReturnValue public double acquire() { - RateLimiter rateLimiter = ref.get(); - return rateLimiter != null ? rateLimiter.acquire() : 0; + RateLimiterWrapper rateLimiterWrapper = ref.get(); + return rateLimiterWrapper != null ? rateLimiterWrapper.rateLimiter.acquire() : 0; } /** @@ -156,7 +150,35 @@ public class SidecarRateLimiter @CanIgnoreReturnValue public double acquire(int permits) { - RateLimiter rateLimiter = ref.get(); - return rateLimiter != null && permits > 0 ? rateLimiter.acquire(permits) : 0; + RateLimiterWrapper rateLimiterWrapper = ref.get(); + return rateLimiterWrapper != null && permits > 0 ? rateLimiterWrapper.rateLimiter.acquire(permits) : 0; + } + + // Attention: Hack to expose the package private method queryEarliestAvailable and RateLimiter.SleepingStopwatch + @SuppressWarnings("UnstableApiUsage") + private static class RateLimiterWrapper + { + private final RateLimiter rateLimiter; + private final RateLimiter.SleepingStopwatch stopwatch; + + private RateLimiterWrapper(RateLimiter rateLimiter, RateLimiter.SleepingStopwatch stopwatch) + { + this.rateLimiter = rateLimiter; + this.stopwatch = stopwatch; + } + + static RateLimiterWrapper create(double permitsPerSecond) + { + RateLimiter.SleepingStopwatch stopwatch = RateLimiter.SleepingStopwatch.createFromSystemTimer(); + RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond, stopwatch); + return new RateLimiterWrapper(rateLimiter, stopwatch); + } + + public long queryWaitTimeInMicros() + { + long earliestAvailableMicros = rateLimiter.queryEarliestAvailable(0); + long nowMicros = stopwatch.readMicros(); + return Math.max(earliestAvailableMicros - nowMicros, 0); + } } } diff --git a/src/main/java/org/apache/cassandra/sidecar/config/ThrottleConfiguration.java b/src/main/java/org/apache/cassandra/sidecar/config/ThrottleConfiguration.java index e4c8af7..0447d35 100644 --- a/src/main/java/org/apache/cassandra/sidecar/config/ThrottleConfiguration.java +++ b/src/main/java/org/apache/cassandra/sidecar/config/ThrottleConfiguration.java @@ -23,9 +23,14 @@ package org.apache.cassandra.sidecar.config; */ public interface ThrottleConfiguration { + /** + * @return rate at which stream requests will be accepted. + */ long rateLimitStreamRequestsPerSecond(); + /** + * @return timeout in seconds used to determine when to stop retrying stream requests when stream requests + * are throttled. + */ long timeoutInSeconds(); - - long delayInSeconds(); } diff --git a/src/main/java/org/apache/cassandra/sidecar/config/yaml/ThrottleConfigurationImpl.java b/src/main/java/org/apache/cassandra/sidecar/config/yaml/ThrottleConfigurationImpl.java index 24f45ce..b755c95 100644 --- a/src/main/java/org/apache/cassandra/sidecar/config/yaml/ThrottleConfigurationImpl.java +++ b/src/main/java/org/apache/cassandra/sidecar/config/yaml/ThrottleConfigurationImpl.java @@ -28,41 +28,36 @@ public class ThrottleConfigurationImpl implements ThrottleConfiguration { public static final long DEFAULT_STREAM_REQUESTS_PER_SEC = 5000; public static final long DEFAULT_TIMEOUT_SEC = 10; - public static final long DEFAULT_DELAY_SEC = 5; public static final String STREAM_REQUESTS_PER_SEC_PROPERTY = "stream_requests_per_sec"; public static final String TIMEOUT_SEC_PROPERTY = "timeout_sec"; - public static final String DELAY_SEC_PROPERTY = "delay_sec"; @JsonProperty(value = STREAM_REQUESTS_PER_SEC_PROPERTY) protected final long rateLimitStreamRequestsPerSecond; @JsonProperty(value = TIMEOUT_SEC_PROPERTY) protected final long timeoutInSeconds; - @JsonProperty(value = DELAY_SEC_PROPERTY) - protected final long delayInSeconds; public ThrottleConfigurationImpl() { this(DEFAULT_STREAM_REQUESTS_PER_SEC, - DEFAULT_TIMEOUT_SEC, - DEFAULT_DELAY_SEC); + DEFAULT_TIMEOUT_SEC); } public ThrottleConfigurationImpl(long rateLimitStreamRequestsPerSecond) { this(rateLimitStreamRequestsPerSecond, - DEFAULT_TIMEOUT_SEC, - DEFAULT_DELAY_SEC); + DEFAULT_TIMEOUT_SEC); } public ThrottleConfigurationImpl(long rateLimitStreamRequestsPerSecond, - long timeoutInSeconds, - long delayInSeconds) + long timeoutInSeconds) { this.rateLimitStreamRequestsPerSecond = rateLimitStreamRequestsPerSecond; this.timeoutInSeconds = timeoutInSeconds; - this.delayInSeconds = delayInSeconds; } + /** + * {@inheritDoc} + */ @Override @JsonProperty(value = STREAM_REQUESTS_PER_SEC_PROPERTY) public long rateLimitStreamRequestsPerSecond() @@ -70,17 +65,13 @@ public class ThrottleConfigurationImpl implements ThrottleConfiguration return rateLimitStreamRequestsPerSecond; } + /** + * {@inheritDoc} + */ @Override @JsonProperty(value = TIMEOUT_SEC_PROPERTY) public long timeoutInSeconds() { return timeoutInSeconds; } - - @Override - @JsonProperty(value = DELAY_SEC_PROPERTY) - public long delayInSeconds() - { - return delayInSeconds; - } } diff --git a/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java b/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java index 9b798f6..960d116 100644 --- a/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java +++ b/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java @@ -28,7 +28,7 @@ import io.vertx.core.http.HttpServerResponse; import io.vertx.core.net.SocketAddress; import org.apache.cassandra.sidecar.common.utils.HttpRange; -import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; /** * Wrapper around HttpServerResponse @@ -46,10 +46,11 @@ public class HttpResponse this.host = request.host(); } - public void setRetryAfterHeader(long microsToWait) + public void setRetryAfterHeader(long waitTimeNanos) { response.setStatusCode(HttpResponseStatus.TOO_MANY_REQUESTS.code()); - response.putHeader(HttpHeaderNames.RETRY_AFTER, Long.toString(MICROSECONDS.toSeconds(microsToWait))).end(); + // round up when converting to second value + response.putHeader(HttpHeaderNames.RETRY_AFTER, Long.toString(NANOSECONDS.toSeconds(waitTimeNanos) + 1)).end(); } public void setTooManyRequestsStatus() diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java b/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java index eb1b129..1825863 100644 --- a/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java +++ b/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java @@ -18,8 +18,6 @@ package org.apache.cassandra.sidecar.utils; -import java.time.Duration; -import java.time.Instant; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.SidecarRateLimiter; @@ -103,7 +101,14 @@ public class FileStreamer public Future<Void> stream(HttpResponse response, int instanceId, String filename, long fileLength, HttpRange range) { Promise<Void> promise = Promise.promise(); - acquireAndSend(response, instanceId, filename, fileLength, range, Instant.now(), promise); + try + { + acquireAndSend(response, instanceId, filename, fileLength, range, System.nanoTime(), promise); + } + catch (Throwable t) + { + promise.tryFail(t); + } return promise.future(); } @@ -111,25 +116,25 @@ public class FileStreamer * Send the file if rate-limiting is disabled or when it successfully acquires a permit from the * {@link SidecarRateLimiter}. * - * @param response the response to use - * @param instanceId Cassandra instance from which file is streamed - * @param filename the path to the file to serve - * @param fileLength the size of the file to serve - * @param range the range to stream - * @param startTime the start time of this request - * @param promise a promise for the stream + * @param response the response to use + * @param instanceId Cassandra instance from which file is streamed + * @param filename the path to the file to serve + * @param fileLength the size of the file to serve + * @param range the range to stream + * @param startTimeNanos the start time of this request + * @param promise a promise for the stream */ private void acquireAndSend(HttpResponse response, int instanceId, String filename, long fileLength, HttpRange range, - Instant startTime, + long startTimeNanos, Promise<Void> promise) { InstanceMetrics instanceMetrics = instanceMetadataFetcher.instance(instanceId).metrics(); StreamSSTableMetrics streamSSTableMetrics = instanceMetrics.streamSSTable(); - if (acquire(response, instanceId, filename, fileLength, range, startTime, streamSSTableMetrics, promise)) + if (acquire(response, instanceId, filename, fileLength, range, startTimeNanos, streamSSTableMetrics, promise)) { // Stream data if rate limiting is disabled or if we acquire LOGGER.debug("Streaming range {} for file {} to client {}. Instance: {}", range, filename, @@ -158,55 +163,53 @@ public class FileStreamer * @param filename the path to the file to serve * @param fileLength the size of the file to serve * @param range the range to stream - * @param startTime the start time of this request + * @param startTimeNanos the start time of this request * @param streamSSTableMetrics metrics captured during streaming of SSTables * @param promise a promise for the stream * @return {@code true} if the permit was acquired, {@code false} otherwise */ private boolean acquire(HttpResponse response, int instanceId, String filename, long fileLength, HttpRange range, - Instant startTime, StreamSSTableMetrics streamSSTableMetrics, Promise<Void> promise) + long startTimeNanos, StreamSSTableMetrics streamSSTableMetrics, Promise<Void> promise) { if (rateLimiter.tryAcquire()) return true; - long microsToWait; - if (checkRetriesExhausted(startTime)) + long waitTimeNanos = MICROSECONDS.toNanos(rateLimiter.queryWaitTimeInMicros()); + if (isTimeoutExceeded(startTimeNanos, waitTimeNanos)) { - LOGGER.error("Retries for acquiring permit exhausted for client {}. Instance: {}", response.remoteAddress(), - response.host()); + LOGGER.warn("Retries for acquiring permit exhausted for client {}. Instance: {}. " + + "Asking client to retry after {} nanoseconds.", response.remoteAddress(), response.host(), + waitTimeNanos); + response.setRetryAfterHeader(waitTimeNanos); + streamSSTableMetrics.throttled.metric.update(1); promise.fail(new HttpException(TOO_MANY_REQUESTS.code(), "Retry exhausted")); } - else if ((microsToWait = rateLimiter.queryEarliestAvailable(0L)) - < TimeUnit.SECONDS.toMicros(config.delayInSeconds())) + else { - microsToWait = Math.max(0, microsToWait); - - LOGGER.debug("Retrying streaming after {} micros for client {}. Instance: {}", microsToWait, + LOGGER.debug("Retrying streaming after {} nanos for client {}. Instance: {}", waitTimeNanos, response.remoteAddress(), response.host()); executorPools.service() - .setTimer(MICROSECONDS.toMillis(microsToWait), + // Note: adding an extra millisecond is required for 2 reasons + // 1. setTimer does not like scheduling with 0 delay; it throws + // 2. the retry should be scheduled later than the waitTimeNanos, in order to ensure it can acquire + .setTimer(TimeUnit.NANOSECONDS.toMillis(waitTimeNanos) + 1, t -> acquireAndSend(response, instanceId, filename, fileLength, range, - startTime, promise)); - } - else - { - LOGGER.debug("Asking client {} to retry after {} micros. Instance: {}", response.remoteAddress(), - microsToWait, response.host()); - response.setRetryAfterHeader(microsToWait); - streamSSTableMetrics.throttled.metric.update(1); - promise.fail(new HttpException(TOO_MANY_REQUESTS.code(), "Ask client to retry later")); + startTimeNanos, promise)); } return false; } /** - * @param startTime the request start time - * @return true if we exhausted the retries, false otherwise + * @param startTimeNanos the request start time in nanoseconds + * @param waitTimeNanos the estimated time to wait for the next available permit in nanoseconds + * + * @return true if we exceeded timeout, false otherwise + * Note: for this check we take wait time for the request into consideration */ - private boolean checkRetriesExhausted(Instant startTime) + private boolean isTimeoutExceeded(long startTimeNanos, long waitTimeNanos) { - return startTime.plus(Duration.ofSeconds(config.timeoutInSeconds())) - .isBefore(Instant.now()); + long nowNanos = System.nanoTime(); + return startTimeNanos + waitTimeNanos + TimeUnit.SECONDS.toNanos(config.timeoutInSeconds()) < nowNanos; } /** diff --git a/src/test/java/com/google/common/util/concurrent/SidecarRateLimiterTest.java b/src/test/java/com/google/common/util/concurrent/SidecarRateLimiterTest.java index afeb0a0..9ff27ea 100644 --- a/src/test/java/com/google/common/util/concurrent/SidecarRateLimiterTest.java +++ b/src/test/java/com/google/common/util/concurrent/SidecarRateLimiterTest.java @@ -18,6 +18,8 @@ package com.google.common.util.concurrent; +import java.util.concurrent.TimeUnit; + import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -37,13 +39,14 @@ class SidecarRateLimiterTest assertThat(enabledRateLimiter.tryAcquire()).isTrue(); enabledRateLimiter.rate(150); assertThat(enabledRateLimiter.rate()).isEqualTo(150); - assertThat(enabledRateLimiter.queryEarliestAvailable(0)).isGreaterThan(0); + enabledRateLimiter.acquire(200); + assertThat(enabledRateLimiter.queryWaitTimeInMicros()).isGreaterThan(0); // Creates a SidecarRateLimiter that is disabled SidecarRateLimiter disabledRateLimiter = SidecarRateLimiter.create(-1); assertThat(disabledRateLimiter).isNotNull(); assertThat(disabledRateLimiter.rate()).isEqualTo(0); - assertThat(disabledRateLimiter.queryEarliestAvailable(1000L)).isEqualTo(0); + assertThat(disabledRateLimiter.queryWaitTimeInMicros()).isEqualTo(0); } @Test @@ -83,4 +86,41 @@ class SidecarRateLimiterTest assertThat(rateLimiter.acquire(5)).isEqualTo(0); assertThat(rateLimiter.acquire(500)).isNotEqualTo(0); } + + @Test + void testWaitTimeReturned() + { + SidecarRateLimiter rateLimiter = SidecarRateLimiter.create(10); + + rateLimiter.acquire(10); + assertThat(rateLimiter.queryWaitTimeInMicros()).isGreaterThanOrEqualTo(500000); + + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + + rateLimiter.acquire(100); + assertThat(rateLimiter.queryWaitTimeInMicros()).isGreaterThanOrEqualTo(8000000); + } + + @Test + void testClockResetWithRateUpdate() + { + SidecarRateLimiter rateLimiter = SidecarRateLimiter.create(-1); + rateLimiter.acquire(2000); + assertThat(rateLimiter.queryWaitTimeInMicros()).isEqualTo(0); + + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); + + rateLimiter.rate(1); + rateLimiter.acquire(4); + assertThat(rateLimiter.queryWaitTimeInMicros()).isGreaterThanOrEqualTo(3000000); + } + + @Test + void testExhaustingPermits() + { + SidecarRateLimiter rateLimiter = SidecarRateLimiter.create(1); + rateLimiter.acquire(20); + + assertThat(rateLimiter.queryWaitTimeInMicros()).isGreaterThanOrEqualTo(TimeUnit.SECONDS.toMicros(10)); + } } diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java b/src/test/java/org/apache/cassandra/sidecar/TestModule.java index 49dbef3..afb94f7 100644 --- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java +++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java @@ -104,7 +104,7 @@ public class TestModule extends AbstractModule protected SidecarConfigurationImpl abstractConfig(SslConfiguration sslConfiguration) { - ThrottleConfiguration throttleConfiguration = new ThrottleConfigurationImpl(1, 10, 5); + ThrottleConfiguration throttleConfiguration = new ThrottleConfigurationImpl(5, 5); SSTableUploadConfiguration uploadConfiguration = new SSTableUploadConfigurationImpl(0F); SchemaKeyspaceConfiguration schemaKeyspaceConfiguration = SchemaKeyspaceConfigurationImpl.builder() diff --git a/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java b/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java index 9627d37..f45deb4 100644 --- a/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java @@ -18,9 +18,10 @@ package org.apache.cassandra.sidecar; -import java.util.concurrent.CompletableFuture; +import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -29,11 +30,14 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.codahale.metrics.SharedMetricRegistries; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.util.Modules; import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Future; import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.client.WebClient; import io.vertx.ext.web.codec.BodyCodec; @@ -43,6 +47,7 @@ import org.apache.cassandra.sidecar.metrics.instance.InstanceMetricsImpl; import org.apache.cassandra.sidecar.metrics.instance.StreamSSTableMetrics; import org.apache.cassandra.sidecar.server.MainModule; import org.apache.cassandra.sidecar.server.Server; +import org.assertj.core.data.Offset; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry; @@ -76,8 +81,7 @@ public class ThrottleTest void tearDown() throws InterruptedException { CountDownLatch closeLatch = new CountDownLatch(1); - registry().removeMatching((name, metric) -> true); - registry(1).removeMatching((name, metric) -> true); + SharedMetricRegistries.clear(); server.close().onSuccess(res -> closeLatch.countDown()); if (closeLatch.await(60, SECONDS)) logger.info("Close event received before timeout."); @@ -86,48 +90,37 @@ public class ThrottleTest } @Test - void testStreamRequestsThrottled() throws Exception + void testStreamRequestsThrottled(VertxTestContext context) throws Exception { String testRoute = "/keyspaces/TestKeyspace/tables/TestTable-54ea95cebba24e0aa9bee428e5d7160b/snapshots" + "/TestSnapshot/components/nb-1-big-Data.db?dataDirectoryIndex=0"; - for (int i = 0; i < 20; i++) - { - unblockingClientRequest(testRoute); - } - - StreamSSTableMetrics streamSSTableMetrics = new InstanceMetricsImpl(registry(1)).streamSSTable(); - - HttpResponse response = blockingClientRequest(testRoute); - assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.TOO_MANY_REQUESTS.code()); - assertThat(streamSSTableMetrics.throttled.metric.getValue()).isGreaterThanOrEqualTo(1); - - long secsToWait = Long.parseLong(response.getHeader("Retry-After")); - Thread.sleep(SECONDS.toMillis(secsToWait)); - - HttpResponse finalResp = blockingClientRequest(testRoute); - assertThat(finalResp.statusCode()).isEqualTo(HttpResponseStatus.OK.code()); - assertThat(finalResp.bodyAsString()).isEqualTo("data"); - } - - private void unblockingClientRequest(String route) - { - WebClient client = WebClient.create(vertx); - client.get(server.actualPort(), "localhost", "/api/v1" + route) - .as(BodyCodec.buffer()) - .send(resp -> - { - // do nothing - }); + long startTime = System.nanoTime(); + List<Future<HttpResponse<Buffer>>> responseFutures = IntStream.range(0, 50) + .mapToObj(i -> sendRequest(testRoute)) + .collect(Collectors.toList()); + + Future.join(responseFutures) + .onComplete(context.succeeding(combinedResp -> { + long elapsedNanos = System.nanoTime() - startTime; + long okResponse = responseFutures.stream() + .filter(resp -> resp.result() != null && resp.result().statusCode() == HttpResponseStatus.OK.code()) + .count(); + double rate = okResponse * SECONDS.toNanos(1) / (double) elapsedNanos; + assertThat(rate).as("Rate is expected to be 5 requests per second. rate=" + rate + " RPS") + .isEqualTo(5D, Offset.offset(2D)); + StreamSSTableMetrics streamSSTableMetrics = new InstanceMetricsImpl(registry(1)).streamSSTable(); + assertThat(streamSSTableMetrics.throttled.metric.getValue()).isGreaterThanOrEqualTo(5); + context.completeNow(); + })); + context.awaitCompletion(30, SECONDS); } - private HttpResponse blockingClientRequest(String route) throws ExecutionException, InterruptedException + private Future<HttpResponse<Buffer>> sendRequest(String route) { WebClient client = WebClient.create(vertx); - CompletableFuture<HttpResponse> future = new CompletableFuture<>(); - client.get(server.actualPort(), "localhost", "/api/v1" + route) - .as(BodyCodec.buffer()) - .send(resp -> future.complete(resp.result())); - return future.get(); + return client.get(server.actualPort(), "localhost", "/api/v1" + route) + .as(BodyCodec.buffer()) + .send(); } } diff --git a/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java b/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java index ba8940f..ad8daae 100644 --- a/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java @@ -394,7 +394,6 @@ class SidecarConfigurationTest assertThat(throttle).isNotNull(); assertThat(throttle.rateLimitStreamRequestsPerSecond()).isEqualTo(5000); - assertThat(throttle.delayInSeconds()).isEqualTo(5); assertThat(throttle.timeoutInSeconds()).isEqualTo(10); // validate traffic shaping options diff --git a/src/test/resources/config/sidecar_file_permissions.yaml b/src/test/resources/config/sidecar_file_permissions.yaml index d1aeabe..7ab200e 100644 --- a/src/test/resources/config/sidecar_file_permissions.yaml +++ b/src/test/resources/config/sidecar_file_permissions.yaml @@ -70,7 +70,6 @@ sidecar: request_timeout_millis: 300000 throttle: stream_requests_per_sec: 5000 - delay_sec: 5 timeout_sec: 10 sstable_upload: concurrent_upload_limit: 80 diff --git a/src/test/resources/config/sidecar_invalid_client_auth.yaml b/src/test/resources/config/sidecar_invalid_client_auth.yaml index 5ca9dfa..6bb88e2 100644 --- a/src/test/resources/config/sidecar_invalid_client_auth.yaml +++ b/src/test/resources/config/sidecar_invalid_client_auth.yaml @@ -72,7 +72,6 @@ sidecar: accept_backlog: 1024 throttle: stream_requests_per_sec: 5000 - delay_sec: 5 timeout_sec: 10 sstable_upload: concurrent_upload_limit: 80 diff --git a/src/test/resources/config/sidecar_invalid_file_permissions.yaml b/src/test/resources/config/sidecar_invalid_file_permissions.yaml index ce192ce..81928a8 100644 --- a/src/test/resources/config/sidecar_invalid_file_permissions.yaml +++ b/src/test/resources/config/sidecar_invalid_file_permissions.yaml @@ -70,7 +70,6 @@ sidecar: request_timeout_millis: 300000 throttle: stream_requests_per_sec: 5000 - delay_sec: 5 timeout_sec: 10 sstable_upload: concurrent_upload_limit: 80 diff --git a/src/test/resources/config/sidecar_metrics.yaml b/src/test/resources/config/sidecar_metrics.yaml index fb7636c..851e6b5 100644 --- a/src/test/resources/config/sidecar_metrics.yaml +++ b/src/test/resources/config/sidecar_metrics.yaml @@ -39,7 +39,6 @@ sidecar: port: 0 throttle: stream_requests_per_sec: 5000 - delay_sec: 5 timeout_sec: 10 allowable_time_skew_in_minutes: 60 jmx: diff --git a/src/test/resources/config/sidecar_metrics_empty_filters.yaml b/src/test/resources/config/sidecar_metrics_empty_filters.yaml index d2af3b7..6aaa548 100644 --- a/src/test/resources/config/sidecar_metrics_empty_filters.yaml +++ b/src/test/resources/config/sidecar_metrics_empty_filters.yaml @@ -39,7 +39,6 @@ sidecar: port: 0 throttle: stream_requests_per_sec: 5000 - delay_sec: 5 timeout_sec: 10 allowable_time_skew_in_minutes: 60 jmx: diff --git a/src/test/resources/config/sidecar_missing_jmx.yaml b/src/test/resources/config/sidecar_missing_jmx.yaml index 6e8a6db..b9193dd 100644 --- a/src/test/resources/config/sidecar_missing_jmx.yaml +++ b/src/test/resources/config/sidecar_missing_jmx.yaml @@ -73,7 +73,6 @@ sidecar: server_verticle_instances: 1 throttle: stream_requests_per_sec: 5000 - delay_sec: 5 timeout_sec: 10 traffic_shaping: inbound_global_bandwidth_bps: 500 diff --git a/src/test/resources/config/sidecar_missing_sstable_snapshot.yaml b/src/test/resources/config/sidecar_missing_sstable_snapshot.yaml index 5269370..7d9c6d5 100644 --- a/src/test/resources/config/sidecar_missing_sstable_snapshot.yaml +++ b/src/test/resources/config/sidecar_missing_sstable_snapshot.yaml @@ -73,7 +73,6 @@ sidecar: server_verticle_instances: 1 throttle: stream_requests_per_sec: 5000 - delay_sec: 5 timeout_sec: 10 traffic_shaping: inbound_global_bandwidth_bps: 500 diff --git a/src/test/resources/config/sidecar_multiple_instances.yaml b/src/test/resources/config/sidecar_multiple_instances.yaml index 9a43710..ffb1dcc 100644 --- a/src/test/resources/config/sidecar_multiple_instances.yaml +++ b/src/test/resources/config/sidecar_multiple_instances.yaml @@ -73,7 +73,6 @@ sidecar: server_verticle_instances: 1 throttle: stream_requests_per_sec: 5000 - delay_sec: 5 timeout_sec: 10 traffic_shaping: inbound_global_bandwidth_bps: 500 diff --git a/src/test/resources/config/sidecar_no_local_instances.yaml b/src/test/resources/config/sidecar_no_local_instances.yaml index 8c70e6d..83f0bd5 100644 --- a/src/test/resources/config/sidecar_no_local_instances.yaml +++ b/src/test/resources/config/sidecar_no_local_instances.yaml @@ -13,7 +13,6 @@ sidecar: server_verticle_instances: 2 throttle: stream_requests_per_sec: 5000 - delay_sec: 5 timeout_sec: 10 traffic_shaping: inbound_global_bandwidth_bps: 500 diff --git a/src/test/resources/config/sidecar_schema_keyspace_configuration.yaml b/src/test/resources/config/sidecar_schema_keyspace_configuration.yaml index a1f45c3..443958e 100644 --- a/src/test/resources/config/sidecar_schema_keyspace_configuration.yaml +++ b/src/test/resources/config/sidecar_schema_keyspace_configuration.yaml @@ -26,7 +26,6 @@ sidecar: server_verticle_instances: 2 throttle: stream_requests_per_sec: 5000 - delay_sec: 5 timeout_sec: 10 traffic_shaping: inbound_global_bandwidth_bps: 500 diff --git a/src/test/resources/config/sidecar_single_instance.yaml b/src/test/resources/config/sidecar_single_instance.yaml index f47cce3..257865e 100644 --- a/src/test/resources/config/sidecar_single_instance.yaml +++ b/src/test/resources/config/sidecar_single_instance.yaml @@ -26,7 +26,6 @@ sidecar: server_verticle_instances: 2 throttle: stream_requests_per_sec: 5000 - delay_sec: 5 timeout_sec: 10 traffic_shaping: inbound_global_bandwidth_bps: 500 diff --git a/src/test/resources/config/sidecar_ssl.yaml b/src/test/resources/config/sidecar_ssl.yaml index c61b008..b2c8cb8 100644 --- a/src/test/resources/config/sidecar_ssl.yaml +++ b/src/test/resources/config/sidecar_ssl.yaml @@ -73,7 +73,6 @@ sidecar: server_verticle_instances: 1 throttle: stream_requests_per_sec: 5000 - delay_sec: 5 timeout_sec: 10 traffic_shaping: inbound_global_bandwidth_bps: 500 diff --git a/src/test/resources/config/sidecar_validation_configuration.yaml b/src/test/resources/config/sidecar_validation_configuration.yaml index 2d6364f..40ba742 100644 --- a/src/test/resources/config/sidecar_validation_configuration.yaml +++ b/src/test/resources/config/sidecar_validation_configuration.yaml @@ -18,7 +18,6 @@ sidecar: request_timeout_millis: 1200000 throttle: stream_requests_per_sec: 80 - delay_sec: 7 timeout_sec: 21 allowable_time_skew_in_minutes: 89 sstable_import: diff --git a/src/test/resources/config/sidecar_with_single_multiple_instances.yaml b/src/test/resources/config/sidecar_with_single_multiple_instances.yaml index a207884..40f1375 100644 --- a/src/test/resources/config/sidecar_with_single_multiple_instances.yaml +++ b/src/test/resources/config/sidecar_with_single_multiple_instances.yaml @@ -36,7 +36,6 @@ sidecar: port: 9043 throttle: stream_requests_per_sec: 5000 - delay_sec: 5 timeout_sec: 10 allowable_time_skew_in_minutes: 60 jmx: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org