This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6b34e18  CASSANDRASC-124 Fix wait time acquired in SidecarRateLimiter 
(#115)
6b34e18 is described below

commit 6b34e1883e69c0c1a3f260000777cdb0cfe91625
Author: Saranya Krishnakumar <sarany...@apple.com>
AuthorDate: Tue Apr 30 20:02:55 2024 -0700

    CASSANDRASC-124 Fix wait time acquired in SidecarRateLimiter (#115)
    
    
    Patch by Saranya Krishnakumar; Reviewed by Yifan Cai, Francisco Guerrero 
for CASSANDRASC-124
---
 CHANGES.txt                                        |  1 +
 src/main/dist/conf/sidecar.yaml                    |  1 -
 .../common/util/concurrent/SidecarRateLimiter.java | 76 +++++++++++++--------
 .../sidecar/config/ThrottleConfiguration.java      |  9 ++-
 .../config/yaml/ThrottleConfigurationImpl.java     | 27 +++-----
 .../cassandra/sidecar/models/HttpResponse.java     |  7 +-
 .../cassandra/sidecar/utils/FileStreamer.java      | 79 +++++++++++-----------
 .../util/concurrent/SidecarRateLimiterTest.java    | 44 +++++++++++-
 .../org/apache/cassandra/sidecar/TestModule.java   |  2 +-
 .../org/apache/cassandra/sidecar/ThrottleTest.java | 71 +++++++++----------
 .../sidecar/config/SidecarConfigurationTest.java   |  1 -
 .../resources/config/sidecar_file_permissions.yaml |  1 -
 .../config/sidecar_invalid_client_auth.yaml        |  1 -
 .../config/sidecar_invalid_file_permissions.yaml   |  1 -
 src/test/resources/config/sidecar_metrics.yaml     |  1 -
 .../config/sidecar_metrics_empty_filters.yaml      |  1 -
 src/test/resources/config/sidecar_missing_jmx.yaml |  1 -
 .../config/sidecar_missing_sstable_snapshot.yaml   |  1 -
 .../config/sidecar_multiple_instances.yaml         |  1 -
 .../config/sidecar_no_local_instances.yaml         |  1 -
 .../sidecar_schema_keyspace_configuration.yaml     |  1 -
 .../resources/config/sidecar_single_instance.yaml  |  1 -
 src/test/resources/config/sidecar_ssl.yaml         |  1 -
 .../config/sidecar_validation_configuration.yaml   |  1 -
 .../sidecar_with_single_multiple_instances.yaml    |  1 -
 25 files changed, 186 insertions(+), 146 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 7968525..b2a3fd9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Fix wait time acquired in SidecarRateLimiter (CASSANDRASC-124)
  * Make RestoreJobDiscoverer less verbose (CASSANDRASC-126)
  * Import Queue pendingImports metrics is reporting an incorrect value 
(CASSANDRASC-125)
  * Add missing method to retrieve the InetSocketAddress to DriverUtils 
(CASSANDRASC-123)
diff --git a/src/main/dist/conf/sidecar.yaml b/src/main/dist/conf/sidecar.yaml
index 734dcc4..31310c7 100644
--- a/src/main/dist/conf/sidecar.yaml
+++ b/src/main/dist/conf/sidecar.yaml
@@ -73,7 +73,6 @@ sidecar:
   server_verticle_instances: 1
   throttle:
     stream_requests_per_sec: 5000
-    delay_sec: 5
     timeout_sec: 10
   traffic_shaping:
     inbound_global_bandwidth_bps: 0               # 0 implies unthrottled, the 
inbound bandwidth in bytes per second
diff --git 
a/src/main/java/com/google/common/util/concurrent/SidecarRateLimiter.java 
b/src/main/java/com/google/common/util/concurrent/SidecarRateLimiter.java
index b5abc14..3b025bf 100644
--- a/src/main/java/com/google/common/util/concurrent/SidecarRateLimiter.java
+++ b/src/main/java/com/google/common/util/concurrent/SidecarRateLimiter.java
@@ -28,17 +28,16 @@ import 
com.google.errorprone.annotations.CanIgnoreReturnValue;
  * <p>
  * In addition to Guava's Rate Limiter functionality, it adds support for 
disabling rate-limiting.
  */
-@SuppressWarnings("UnstableApiUsage")
 public class SidecarRateLimiter
 {
-    private final AtomicReference<RateLimiter> ref = new 
AtomicReference<>(null);
+    private final AtomicReference<RateLimiterWrapper> ref = new 
AtomicReference<>(null);
 
     private SidecarRateLimiter(final double permitsPerSecond)
     {
         if (permitsPerSecond > 0)
         {
-            RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond);
-            ref.set(rateLimiter);
+            RateLimiterWrapper rateLimiterWrapper = 
RateLimiterWrapper.create(permitsPerSecond);
+            ref.set(rateLimiterWrapper);
         }
     }
 
@@ -55,21 +54,16 @@ public class SidecarRateLimiter
         return new SidecarRateLimiter(permitsPerSecond);
     }
 
-    // Attention: Hack to expose the package private method 
queryEarliestAvailable
-
     /**
-     * Returns the earliest time permits will become available. Returns 0 if 
disabled.
-     *
-     * <br><b>Note:</b> this is a hack to expose the package private method
-     * {@link RateLimiter#queryEarliestAvailable(long)}
+     * Returns calculated wait time in micros for next available permit. 
Permit is not reserved during calculation,
+     * this wait time is an approximation.
      *
-     * @param nowMicros current time in micros
-     * @return earliest time permits will become available
+     * @return approx wait time in micros for next available permit
      */
-    public long queryEarliestAvailable(final long nowMicros)
+    public long queryWaitTimeInMicros()
     {
-        RateLimiter rateLimiter = ref.get();
-        return rateLimiter != null ? 
rateLimiter.queryEarliestAvailable(nowMicros) : 0;
+        RateLimiterWrapper rateLimiterWrapper = ref.get();
+        return rateLimiterWrapper != null ? 
rateLimiterWrapper.queryWaitTimeInMicros() : 0;
     }
 
     /**
@@ -79,8 +73,8 @@ public class SidecarRateLimiter
      */
     public boolean tryAcquire()
     {
-        RateLimiter rateLimiter = ref.get();
-        return rateLimiter == null || rateLimiter.tryAcquire();
+        RateLimiterWrapper rateLimiterWrapper = ref.get();
+        return rateLimiterWrapper == null || 
rateLimiterWrapper.rateLimiter.tryAcquire();
     }
 
     /**
@@ -93,17 +87,17 @@ public class SidecarRateLimiter
      */
     public void rate(double permitsPerSecond)
     {
-        RateLimiter rateLimiter = ref.get();
+        RateLimiterWrapper rateLimiterWrapper = ref.get();
 
         if (permitsPerSecond > 0.0)
         {
-            if (rateLimiter == null)
+            if (rateLimiterWrapper == null)
             {
-                ref.compareAndSet(null, RateLimiter.create(permitsPerSecond));
+                ref.compareAndSet(null, 
RateLimiterWrapper.create(permitsPerSecond));
             }
             else
             {
-                rateLimiter.setRate(permitsPerSecond);
+                rateLimiterWrapper.rateLimiter.setRate(permitsPerSecond);
             }
         }
         else
@@ -123,8 +117,8 @@ public class SidecarRateLimiter
      */
     public double rate()
     {
-        RateLimiter rateLimiter = ref.get();
-        return rateLimiter != null ? rateLimiter.getRate() : 0;
+        RateLimiterWrapper rateLimiterWrapper = ref.get();
+        return rateLimiterWrapper != null ? 
rateLimiterWrapper.rateLimiter.getRate() : 0;
     }
 
     /**
@@ -139,8 +133,8 @@ public class SidecarRateLimiter
     @CanIgnoreReturnValue
     public double acquire()
     {
-        RateLimiter rateLimiter = ref.get();
-        return rateLimiter != null ? rateLimiter.acquire() : 0;
+        RateLimiterWrapper rateLimiterWrapper = ref.get();
+        return rateLimiterWrapper != null ? 
rateLimiterWrapper.rateLimiter.acquire() : 0;
     }
 
     /**
@@ -156,7 +150,35 @@ public class SidecarRateLimiter
     @CanIgnoreReturnValue
     public double acquire(int permits)
     {
-        RateLimiter rateLimiter = ref.get();
-        return rateLimiter != null && permits > 0 ? 
rateLimiter.acquire(permits) : 0;
+        RateLimiterWrapper rateLimiterWrapper = ref.get();
+        return rateLimiterWrapper != null && permits > 0 ? 
rateLimiterWrapper.rateLimiter.acquire(permits) : 0;
+    }
+
+    // Attention: Hack to expose the package private method 
queryEarliestAvailable and RateLimiter.SleepingStopwatch
+    @SuppressWarnings("UnstableApiUsage")
+    private static class RateLimiterWrapper
+    {
+        private final RateLimiter rateLimiter;
+        private final RateLimiter.SleepingStopwatch stopwatch;
+
+        private RateLimiterWrapper(RateLimiter rateLimiter, 
RateLimiter.SleepingStopwatch stopwatch)
+        {
+            this.rateLimiter = rateLimiter;
+            this.stopwatch = stopwatch;
+        }
+
+        static RateLimiterWrapper create(double permitsPerSecond)
+        {
+            RateLimiter.SleepingStopwatch stopwatch = 
RateLimiter.SleepingStopwatch.createFromSystemTimer();
+            RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond, 
stopwatch);
+            return new RateLimiterWrapper(rateLimiter, stopwatch);
+        }
+
+        public long queryWaitTimeInMicros()
+        {
+            long earliestAvailableMicros = 
rateLimiter.queryEarliestAvailable(0);
+            long nowMicros = stopwatch.readMicros();
+            return Math.max(earliestAvailableMicros - nowMicros, 0);
+        }
     }
 }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/config/ThrottleConfiguration.java 
b/src/main/java/org/apache/cassandra/sidecar/config/ThrottleConfiguration.java
index e4c8af7..0447d35 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/config/ThrottleConfiguration.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/config/ThrottleConfiguration.java
@@ -23,9 +23,14 @@ package org.apache.cassandra.sidecar.config;
  */
 public interface ThrottleConfiguration
 {
+    /**
+     * @return rate at which stream requests will be accepted.
+     */
     long rateLimitStreamRequestsPerSecond();
 
+    /**
+     * @return timeout in seconds used to determine when to stop retrying 
stream requests when stream requests
+     * are throttled.
+     */
     long timeoutInSeconds();
-
-    long delayInSeconds();
 }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/ThrottleConfigurationImpl.java
 
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/ThrottleConfigurationImpl.java
index 24f45ce..b755c95 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/ThrottleConfigurationImpl.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/ThrottleConfigurationImpl.java
@@ -28,41 +28,36 @@ public class ThrottleConfigurationImpl implements 
ThrottleConfiguration
 {
     public static final long DEFAULT_STREAM_REQUESTS_PER_SEC = 5000;
     public static final long DEFAULT_TIMEOUT_SEC = 10;
-    public static final long DEFAULT_DELAY_SEC = 5;
     public static final String STREAM_REQUESTS_PER_SEC_PROPERTY = 
"stream_requests_per_sec";
     public static final String TIMEOUT_SEC_PROPERTY = "timeout_sec";
-    public static final String DELAY_SEC_PROPERTY = "delay_sec";
 
     @JsonProperty(value = STREAM_REQUESTS_PER_SEC_PROPERTY)
     protected final long rateLimitStreamRequestsPerSecond;
     @JsonProperty(value = TIMEOUT_SEC_PROPERTY)
     protected final long timeoutInSeconds;
-    @JsonProperty(value = DELAY_SEC_PROPERTY)
-    protected final long delayInSeconds;
 
     public ThrottleConfigurationImpl()
     {
         this(DEFAULT_STREAM_REQUESTS_PER_SEC,
-             DEFAULT_TIMEOUT_SEC,
-             DEFAULT_DELAY_SEC);
+             DEFAULT_TIMEOUT_SEC);
     }
 
     public ThrottleConfigurationImpl(long rateLimitStreamRequestsPerSecond)
     {
         this(rateLimitStreamRequestsPerSecond,
-             DEFAULT_TIMEOUT_SEC,
-             DEFAULT_DELAY_SEC);
+             DEFAULT_TIMEOUT_SEC);
     }
 
     public ThrottleConfigurationImpl(long rateLimitStreamRequestsPerSecond,
-                                     long timeoutInSeconds,
-                                     long delayInSeconds)
+                                     long timeoutInSeconds)
     {
         this.rateLimitStreamRequestsPerSecond = 
rateLimitStreamRequestsPerSecond;
         this.timeoutInSeconds = timeoutInSeconds;
-        this.delayInSeconds = delayInSeconds;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     @JsonProperty(value = STREAM_REQUESTS_PER_SEC_PROPERTY)
     public long rateLimitStreamRequestsPerSecond()
@@ -70,17 +65,13 @@ public class ThrottleConfigurationImpl implements 
ThrottleConfiguration
         return rateLimitStreamRequestsPerSecond;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     @JsonProperty(value = TIMEOUT_SEC_PROPERTY)
     public long timeoutInSeconds()
     {
         return timeoutInSeconds;
     }
-
-    @Override
-    @JsonProperty(value = DELAY_SEC_PROPERTY)
-    public long delayInSeconds()
-    {
-        return delayInSeconds;
-    }
 }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java 
b/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java
index 9b798f6..960d116 100644
--- a/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java
+++ b/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java
@@ -28,7 +28,7 @@ import io.vertx.core.http.HttpServerResponse;
 import io.vertx.core.net.SocketAddress;
 import org.apache.cassandra.sidecar.common.utils.HttpRange;
 
-import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 /**
  * Wrapper around HttpServerResponse
@@ -46,10 +46,11 @@ public class HttpResponse
         this.host = request.host();
     }
 
-    public void setRetryAfterHeader(long microsToWait)
+    public void setRetryAfterHeader(long waitTimeNanos)
     {
         response.setStatusCode(HttpResponseStatus.TOO_MANY_REQUESTS.code());
-        response.putHeader(HttpHeaderNames.RETRY_AFTER, 
Long.toString(MICROSECONDS.toSeconds(microsToWait))).end();
+        // round up when converting to second value
+        response.putHeader(HttpHeaderNames.RETRY_AFTER, 
Long.toString(NANOSECONDS.toSeconds(waitTimeNanos) + 1)).end();
     }
 
     public void setTooManyRequestsStatus()
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java 
b/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
index eb1b129..1825863 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
@@ -18,8 +18,6 @@
 
 package org.apache.cassandra.sidecar.utils;
 
-import java.time.Duration;
-import java.time.Instant;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.SidecarRateLimiter;
@@ -103,7 +101,14 @@ public class FileStreamer
     public Future<Void> stream(HttpResponse response, int instanceId, String 
filename, long fileLength, HttpRange range)
     {
         Promise<Void> promise = Promise.promise();
-        acquireAndSend(response, instanceId, filename, fileLength, range, 
Instant.now(), promise);
+        try
+        {
+            acquireAndSend(response, instanceId, filename, fileLength, range, 
System.nanoTime(), promise);
+        }
+        catch (Throwable t)
+        {
+            promise.tryFail(t);
+        }
         return promise.future();
     }
 
@@ -111,25 +116,25 @@ public class FileStreamer
      * Send the file if rate-limiting is disabled or when it successfully 
acquires a permit from the
      * {@link SidecarRateLimiter}.
      *
-     * @param response   the response to use
-     * @param instanceId  Cassandra instance from which file is streamed
-     * @param filename   the path to the file to serve
-     * @param fileLength the size of the file to serve
-     * @param range      the range to stream
-     * @param startTime  the start time of this request
-     * @param promise    a promise for the stream
+     * @param response          the response to use
+     * @param instanceId        Cassandra instance from which file is streamed
+     * @param filename          the path to the file to serve
+     * @param fileLength        the size of the file to serve
+     * @param range             the range to stream
+     * @param startTimeNanos    the start time of this request
+     * @param promise a promise for the stream
      */
     private void acquireAndSend(HttpResponse response,
                                 int instanceId,
                                 String filename,
                                 long fileLength,
                                 HttpRange range,
-                                Instant startTime,
+                                long startTimeNanos,
                                 Promise<Void> promise)
     {
         InstanceMetrics instanceMetrics = 
instanceMetadataFetcher.instance(instanceId).metrics();
         StreamSSTableMetrics streamSSTableMetrics = 
instanceMetrics.streamSSTable();
-        if (acquire(response, instanceId, filename, fileLength, range, 
startTime, streamSSTableMetrics, promise))
+        if (acquire(response, instanceId, filename, fileLength, range, 
startTimeNanos, streamSSTableMetrics, promise))
         {
             // Stream data if rate limiting is disabled or if we acquire
             LOGGER.debug("Streaming range {} for file {} to client {}. 
Instance: {}", range, filename,
@@ -158,55 +163,53 @@ public class FileStreamer
      * @param filename              the path to the file to serve
      * @param fileLength            the size of the file to serve
      * @param range                 the range to stream
-     * @param startTime             the start time of this request
+     * @param startTimeNanos        the start time of this request
      * @param streamSSTableMetrics  metrics captured during streaming of 
SSTables
      * @param promise               a promise for the stream
      * @return {@code true} if the permit was acquired, {@code false} otherwise
      */
     private boolean acquire(HttpResponse response, int instanceId, String 
filename, long fileLength, HttpRange range,
-                            Instant startTime, StreamSSTableMetrics 
streamSSTableMetrics, Promise<Void> promise)
+                            long startTimeNanos, StreamSSTableMetrics 
streamSSTableMetrics, Promise<Void> promise)
     {
         if (rateLimiter.tryAcquire())
             return true;
 
-        long microsToWait;
-        if (checkRetriesExhausted(startTime))
+        long waitTimeNanos = 
MICROSECONDS.toNanos(rateLimiter.queryWaitTimeInMicros());
+        if (isTimeoutExceeded(startTimeNanos, waitTimeNanos))
         {
-            LOGGER.error("Retries for acquiring permit exhausted for client 
{}. Instance: {}", response.remoteAddress(),
-                         response.host());
+            LOGGER.warn("Retries for acquiring permit exhausted for client {}. 
Instance: {}. " +
+                        "Asking client to retry after {} nanoseconds.", 
response.remoteAddress(), response.host(),
+                        waitTimeNanos);
+            response.setRetryAfterHeader(waitTimeNanos);
+            streamSSTableMetrics.throttled.metric.update(1);
             promise.fail(new HttpException(TOO_MANY_REQUESTS.code(), "Retry 
exhausted"));
         }
-        else if ((microsToWait = rateLimiter.queryEarliestAvailable(0L))
-                 < TimeUnit.SECONDS.toMicros(config.delayInSeconds()))
+        else
         {
-            microsToWait = Math.max(0, microsToWait);
-
-            LOGGER.debug("Retrying streaming after {} micros for client {}. 
Instance: {}", microsToWait,
+            LOGGER.debug("Retrying streaming after {} nanos for client {}. 
Instance: {}", waitTimeNanos,
                          response.remoteAddress(), response.host());
             executorPools.service()
-                         .setTimer(MICROSECONDS.toMillis(microsToWait),
+                         // Note: adding an extra millisecond is required for 
2 reasons
+                         // 1. setTimer does not like scheduling with 0 delay; 
it throws
+                         // 2. the retry should be scheduled later than the 
waitTimeNanos, in order to ensure it can acquire
+                         
.setTimer(TimeUnit.NANOSECONDS.toMillis(waitTimeNanos) + 1,
                                    t -> acquireAndSend(response, instanceId, 
filename, fileLength, range,
-                                                       startTime, promise));
-        }
-        else
-        {
-            LOGGER.debug("Asking client {} to retry after {} micros. Instance: 
{}", response.remoteAddress(),
-                         microsToWait, response.host());
-            response.setRetryAfterHeader(microsToWait);
-            streamSSTableMetrics.throttled.metric.update(1);
-            promise.fail(new HttpException(TOO_MANY_REQUESTS.code(), "Ask 
client to retry later"));
+                                                       startTimeNanos, 
promise));
         }
         return false;
     }
 
     /**
-     * @param startTime the request start time
-     * @return true if we exhausted the retries, false otherwise
+     * @param startTimeNanos the request start time in nanoseconds
+     * @param waitTimeNanos the estimated time to wait for the next available 
permit in nanoseconds
+     *
+     * @return true if we exceeded timeout, false otherwise
+     * Note: for this check we take wait time for the request into 
consideration
      */
-    private boolean checkRetriesExhausted(Instant startTime)
+    private boolean isTimeoutExceeded(long startTimeNanos, long waitTimeNanos)
     {
-        return startTime.plus(Duration.ofSeconds(config.timeoutInSeconds()))
-                        .isBefore(Instant.now());
+        long nowNanos = System.nanoTime();
+        return startTimeNanos + waitTimeNanos + 
TimeUnit.SECONDS.toNanos(config.timeoutInSeconds()) < nowNanos;
     }
 
     /**
diff --git 
a/src/test/java/com/google/common/util/concurrent/SidecarRateLimiterTest.java 
b/src/test/java/com/google/common/util/concurrent/SidecarRateLimiterTest.java
index afeb0a0..9ff27ea 100644
--- 
a/src/test/java/com/google/common/util/concurrent/SidecarRateLimiterTest.java
+++ 
b/src/test/java/com/google/common/util/concurrent/SidecarRateLimiterTest.java
@@ -18,6 +18,8 @@
 
 package com.google.common.util.concurrent;
 
+import java.util.concurrent.TimeUnit;
+
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -37,13 +39,14 @@ class SidecarRateLimiterTest
         assertThat(enabledRateLimiter.tryAcquire()).isTrue();
         enabledRateLimiter.rate(150);
         assertThat(enabledRateLimiter.rate()).isEqualTo(150);
-        
assertThat(enabledRateLimiter.queryEarliestAvailable(0)).isGreaterThan(0);
+        enabledRateLimiter.acquire(200);
+        
assertThat(enabledRateLimiter.queryWaitTimeInMicros()).isGreaterThan(0);
 
         // Creates a SidecarRateLimiter that is disabled
         SidecarRateLimiter disabledRateLimiter = SidecarRateLimiter.create(-1);
         assertThat(disabledRateLimiter).isNotNull();
         assertThat(disabledRateLimiter.rate()).isEqualTo(0);
-        
assertThat(disabledRateLimiter.queryEarliestAvailable(1000L)).isEqualTo(0);
+        assertThat(disabledRateLimiter.queryWaitTimeInMicros()).isEqualTo(0);
     }
 
     @Test
@@ -83,4 +86,41 @@ class SidecarRateLimiterTest
         assertThat(rateLimiter.acquire(5)).isEqualTo(0);
         assertThat(rateLimiter.acquire(500)).isNotEqualTo(0);
     }
+
+    @Test
+    void testWaitTimeReturned()
+    {
+        SidecarRateLimiter rateLimiter = SidecarRateLimiter.create(10);
+
+        rateLimiter.acquire(10);
+        
assertThat(rateLimiter.queryWaitTimeInMicros()).isGreaterThanOrEqualTo(500000);
+
+        Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+        rateLimiter.acquire(100);
+        
assertThat(rateLimiter.queryWaitTimeInMicros()).isGreaterThanOrEqualTo(8000000);
+    }
+
+    @Test
+    void testClockResetWithRateUpdate()
+    {
+        SidecarRateLimiter rateLimiter = SidecarRateLimiter.create(-1);
+        rateLimiter.acquire(2000);
+        assertThat(rateLimiter.queryWaitTimeInMicros()).isEqualTo(0);
+
+        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+
+        rateLimiter.rate(1);
+        rateLimiter.acquire(4);
+        
assertThat(rateLimiter.queryWaitTimeInMicros()).isGreaterThanOrEqualTo(3000000);
+    }
+
+    @Test
+    void testExhaustingPermits()
+    {
+        SidecarRateLimiter rateLimiter = SidecarRateLimiter.create(1);
+        rateLimiter.acquire(20);
+
+        
assertThat(rateLimiter.queryWaitTimeInMicros()).isGreaterThanOrEqualTo(TimeUnit.SECONDS.toMicros(10));
+    }
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java 
b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index 49dbef3..afb94f7 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -104,7 +104,7 @@ public class TestModule extends AbstractModule
 
     protected SidecarConfigurationImpl abstractConfig(SslConfiguration 
sslConfiguration)
     {
-        ThrottleConfiguration throttleConfiguration = new 
ThrottleConfigurationImpl(1, 10, 5);
+        ThrottleConfiguration throttleConfiguration = new 
ThrottleConfigurationImpl(5, 5);
         SSTableUploadConfiguration uploadConfiguration = new 
SSTableUploadConfigurationImpl(0F);
         SchemaKeyspaceConfiguration schemaKeyspaceConfiguration =
         SchemaKeyspaceConfigurationImpl.builder()
diff --git a/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java 
b/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java
index 9627d37..f45deb4 100644
--- a/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java
@@ -18,9 +18,10 @@
 
 package org.apache.cassandra.sidecar;
 
-import java.util.concurrent.CompletableFuture;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -29,11 +30,14 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.codahale.metrics.SharedMetricRegistries;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.util.Modules;
 import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Future;
 import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
 import io.vertx.ext.web.client.HttpResponse;
 import io.vertx.ext.web.client.WebClient;
 import io.vertx.ext.web.codec.BodyCodec;
@@ -43,6 +47,7 @@ import 
org.apache.cassandra.sidecar.metrics.instance.InstanceMetricsImpl;
 import org.apache.cassandra.sidecar.metrics.instance.StreamSSTableMetrics;
 import org.apache.cassandra.sidecar.server.MainModule;
 import org.apache.cassandra.sidecar.server.Server;
+import org.assertj.core.data.Offset;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry;
@@ -76,8 +81,7 @@ public class ThrottleTest
     void tearDown() throws InterruptedException
     {
         CountDownLatch closeLatch = new CountDownLatch(1);
-        registry().removeMatching((name, metric) -> true);
-        registry(1).removeMatching((name, metric) -> true);
+        SharedMetricRegistries.clear();
         server.close().onSuccess(res -> closeLatch.countDown());
         if (closeLatch.await(60, SECONDS))
             logger.info("Close event received before timeout.");
@@ -86,48 +90,37 @@ public class ThrottleTest
     }
 
     @Test
-    void testStreamRequestsThrottled() throws Exception
+    void testStreamRequestsThrottled(VertxTestContext context) throws Exception
     {
         String testRoute = 
"/keyspaces/TestKeyspace/tables/TestTable-54ea95cebba24e0aa9bee428e5d7160b/snapshots"
 +
                            
"/TestSnapshot/components/nb-1-big-Data.db?dataDirectoryIndex=0";
 
-        for (int i = 0; i < 20; i++)
-        {
-            unblockingClientRequest(testRoute);
-        }
-
-        StreamSSTableMetrics streamSSTableMetrics = new 
InstanceMetricsImpl(registry(1)).streamSSTable();
-
-        HttpResponse response = blockingClientRequest(testRoute);
-        
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.TOO_MANY_REQUESTS.code());
-        
assertThat(streamSSTableMetrics.throttled.metric.getValue()).isGreaterThanOrEqualTo(1);
-
-        long secsToWait = Long.parseLong(response.getHeader("Retry-After"));
-        Thread.sleep(SECONDS.toMillis(secsToWait));
-
-        HttpResponse finalResp = blockingClientRequest(testRoute);
-        
assertThat(finalResp.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
-        assertThat(finalResp.bodyAsString()).isEqualTo("data");
-    }
-
-    private void unblockingClientRequest(String route)
-    {
-        WebClient client = WebClient.create(vertx);
-        client.get(server.actualPort(), "localhost", "/api/v1" + route)
-              .as(BodyCodec.buffer())
-              .send(resp ->
-                    {
-                        // do nothing
-                    });
+        long startTime = System.nanoTime();
+        List<Future<HttpResponse<Buffer>>> responseFutures = 
IntStream.range(0, 50)
+                                                                      
.mapToObj(i -> sendRequest(testRoute))
+                                                                      
.collect(Collectors.toList());
+
+        Future.join(responseFutures)
+              .onComplete(context.succeeding(combinedResp -> {
+                  long elapsedNanos = System.nanoTime() - startTime;
+                  long okResponse = responseFutures.stream()
+                                                   .filter(resp -> 
resp.result() != null && resp.result().statusCode() == 
HttpResponseStatus.OK.code())
+                                                   .count();
+                  double rate = okResponse * SECONDS.toNanos(1) / (double) 
elapsedNanos;
+                  assertThat(rate).as("Rate is expected to be 5 requests per 
second. rate=" + rate + " RPS")
+                                  .isEqualTo(5D, Offset.offset(2D));
+                  StreamSSTableMetrics streamSSTableMetrics = new 
InstanceMetricsImpl(registry(1)).streamSSTable();
+                  
assertThat(streamSSTableMetrics.throttled.metric.getValue()).isGreaterThanOrEqualTo(5);
+                  context.completeNow();
+              }));
+        context.awaitCompletion(30, SECONDS);
     }
 
-    private HttpResponse blockingClientRequest(String route) throws 
ExecutionException, InterruptedException
+    private Future<HttpResponse<Buffer>> sendRequest(String route)
     {
         WebClient client = WebClient.create(vertx);
-        CompletableFuture<HttpResponse> future = new CompletableFuture<>();
-        client.get(server.actualPort(), "localhost", "/api/v1" + route)
-              .as(BodyCodec.buffer())
-              .send(resp -> future.complete(resp.result()));
-        return future.get();
+        return client.get(server.actualPort(), "localhost", "/api/v1" + route)
+                     .as(BodyCodec.buffer())
+                     .send();
     }
 }
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
 
b/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
index ba8940f..ad8daae 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
@@ -394,7 +394,6 @@ class SidecarConfigurationTest
 
         assertThat(throttle).isNotNull();
         
assertThat(throttle.rateLimitStreamRequestsPerSecond()).isEqualTo(5000);
-        assertThat(throttle.delayInSeconds()).isEqualTo(5);
         assertThat(throttle.timeoutInSeconds()).isEqualTo(10);
 
         // validate traffic shaping options
diff --git a/src/test/resources/config/sidecar_file_permissions.yaml 
b/src/test/resources/config/sidecar_file_permissions.yaml
index d1aeabe..7ab200e 100644
--- a/src/test/resources/config/sidecar_file_permissions.yaml
+++ b/src/test/resources/config/sidecar_file_permissions.yaml
@@ -70,7 +70,6 @@ sidecar:
   request_timeout_millis: 300000
   throttle:
     stream_requests_per_sec: 5000
-    delay_sec: 5
     timeout_sec: 10
   sstable_upload:
     concurrent_upload_limit: 80
diff --git a/src/test/resources/config/sidecar_invalid_client_auth.yaml 
b/src/test/resources/config/sidecar_invalid_client_auth.yaml
index 5ca9dfa..6bb88e2 100644
--- a/src/test/resources/config/sidecar_invalid_client_auth.yaml
+++ b/src/test/resources/config/sidecar_invalid_client_auth.yaml
@@ -72,7 +72,6 @@ sidecar:
   accept_backlog: 1024
   throttle:
     stream_requests_per_sec: 5000
-    delay_sec: 5
     timeout_sec: 10
   sstable_upload:
     concurrent_upload_limit: 80
diff --git a/src/test/resources/config/sidecar_invalid_file_permissions.yaml 
b/src/test/resources/config/sidecar_invalid_file_permissions.yaml
index ce192ce..81928a8 100644
--- a/src/test/resources/config/sidecar_invalid_file_permissions.yaml
+++ b/src/test/resources/config/sidecar_invalid_file_permissions.yaml
@@ -70,7 +70,6 @@ sidecar:
   request_timeout_millis: 300000
   throttle:
     stream_requests_per_sec: 5000
-    delay_sec: 5
     timeout_sec: 10
   sstable_upload:
     concurrent_upload_limit: 80
diff --git a/src/test/resources/config/sidecar_metrics.yaml 
b/src/test/resources/config/sidecar_metrics.yaml
index fb7636c..851e6b5 100644
--- a/src/test/resources/config/sidecar_metrics.yaml
+++ b/src/test/resources/config/sidecar_metrics.yaml
@@ -39,7 +39,6 @@ sidecar:
   port: 0
   throttle:
     stream_requests_per_sec: 5000
-    delay_sec: 5
     timeout_sec: 10
   allowable_time_skew_in_minutes: 60
   jmx:
diff --git a/src/test/resources/config/sidecar_metrics_empty_filters.yaml 
b/src/test/resources/config/sidecar_metrics_empty_filters.yaml
index d2af3b7..6aaa548 100644
--- a/src/test/resources/config/sidecar_metrics_empty_filters.yaml
+++ b/src/test/resources/config/sidecar_metrics_empty_filters.yaml
@@ -39,7 +39,6 @@ sidecar:
   port: 0
   throttle:
     stream_requests_per_sec: 5000
-    delay_sec: 5
     timeout_sec: 10
   allowable_time_skew_in_minutes: 60
   jmx:
diff --git a/src/test/resources/config/sidecar_missing_jmx.yaml 
b/src/test/resources/config/sidecar_missing_jmx.yaml
index 6e8a6db..b9193dd 100644
--- a/src/test/resources/config/sidecar_missing_jmx.yaml
+++ b/src/test/resources/config/sidecar_missing_jmx.yaml
@@ -73,7 +73,6 @@ sidecar:
   server_verticle_instances: 1
   throttle:
     stream_requests_per_sec: 5000
-    delay_sec: 5
     timeout_sec: 10
   traffic_shaping:
     inbound_global_bandwidth_bps: 500
diff --git a/src/test/resources/config/sidecar_missing_sstable_snapshot.yaml 
b/src/test/resources/config/sidecar_missing_sstable_snapshot.yaml
index 5269370..7d9c6d5 100644
--- a/src/test/resources/config/sidecar_missing_sstable_snapshot.yaml
+++ b/src/test/resources/config/sidecar_missing_sstable_snapshot.yaml
@@ -73,7 +73,6 @@ sidecar:
   server_verticle_instances: 1
   throttle:
     stream_requests_per_sec: 5000
-    delay_sec: 5
     timeout_sec: 10
   traffic_shaping:
     inbound_global_bandwidth_bps: 500
diff --git a/src/test/resources/config/sidecar_multiple_instances.yaml 
b/src/test/resources/config/sidecar_multiple_instances.yaml
index 9a43710..ffb1dcc 100644
--- a/src/test/resources/config/sidecar_multiple_instances.yaml
+++ b/src/test/resources/config/sidecar_multiple_instances.yaml
@@ -73,7 +73,6 @@ sidecar:
   server_verticle_instances: 1
   throttle:
     stream_requests_per_sec: 5000
-    delay_sec: 5
     timeout_sec: 10
   traffic_shaping:
     inbound_global_bandwidth_bps: 500
diff --git a/src/test/resources/config/sidecar_no_local_instances.yaml 
b/src/test/resources/config/sidecar_no_local_instances.yaml
index 8c70e6d..83f0bd5 100644
--- a/src/test/resources/config/sidecar_no_local_instances.yaml
+++ b/src/test/resources/config/sidecar_no_local_instances.yaml
@@ -13,7 +13,6 @@ sidecar:
   server_verticle_instances: 2
   throttle:
     stream_requests_per_sec: 5000
-    delay_sec: 5
     timeout_sec: 10
   traffic_shaping:
     inbound_global_bandwidth_bps: 500
diff --git 
a/src/test/resources/config/sidecar_schema_keyspace_configuration.yaml 
b/src/test/resources/config/sidecar_schema_keyspace_configuration.yaml
index a1f45c3..443958e 100644
--- a/src/test/resources/config/sidecar_schema_keyspace_configuration.yaml
+++ b/src/test/resources/config/sidecar_schema_keyspace_configuration.yaml
@@ -26,7 +26,6 @@ sidecar:
   server_verticle_instances: 2
   throttle:
     stream_requests_per_sec: 5000
-    delay_sec: 5
     timeout_sec: 10
   traffic_shaping:
     inbound_global_bandwidth_bps: 500
diff --git a/src/test/resources/config/sidecar_single_instance.yaml 
b/src/test/resources/config/sidecar_single_instance.yaml
index f47cce3..257865e 100644
--- a/src/test/resources/config/sidecar_single_instance.yaml
+++ b/src/test/resources/config/sidecar_single_instance.yaml
@@ -26,7 +26,6 @@ sidecar:
   server_verticle_instances: 2
   throttle:
     stream_requests_per_sec: 5000
-    delay_sec: 5
     timeout_sec: 10
   traffic_shaping:
     inbound_global_bandwidth_bps: 500
diff --git a/src/test/resources/config/sidecar_ssl.yaml 
b/src/test/resources/config/sidecar_ssl.yaml
index c61b008..b2c8cb8 100644
--- a/src/test/resources/config/sidecar_ssl.yaml
+++ b/src/test/resources/config/sidecar_ssl.yaml
@@ -73,7 +73,6 @@ sidecar:
   server_verticle_instances: 1
   throttle:
     stream_requests_per_sec: 5000
-    delay_sec: 5
     timeout_sec: 10
   traffic_shaping:
     inbound_global_bandwidth_bps: 500
diff --git a/src/test/resources/config/sidecar_validation_configuration.yaml 
b/src/test/resources/config/sidecar_validation_configuration.yaml
index 2d6364f..40ba742 100644
--- a/src/test/resources/config/sidecar_validation_configuration.yaml
+++ b/src/test/resources/config/sidecar_validation_configuration.yaml
@@ -18,7 +18,6 @@ sidecar:
   request_timeout_millis: 1200000
   throttle:
     stream_requests_per_sec: 80
-    delay_sec: 7
     timeout_sec: 21
   allowable_time_skew_in_minutes: 89
   sstable_import:
diff --git 
a/src/test/resources/config/sidecar_with_single_multiple_instances.yaml 
b/src/test/resources/config/sidecar_with_single_multiple_instances.yaml
index a207884..40f1375 100644
--- a/src/test/resources/config/sidecar_with_single_multiple_instances.yaml
+++ b/src/test/resources/config/sidecar_with_single_multiple_instances.yaml
@@ -36,7 +36,6 @@ sidecar:
   port: 9043
   throttle:
     stream_requests_per_sec: 5000
-    delay_sec: 5
     timeout_sec: 10
   allowable_time_skew_in_minutes: 60
   jmx:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to