This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 50ca2c8c73b MINOR: Fix Trogdor Off-By-One Errors. (#17095)
50ca2c8c73b is described below

commit 50ca2c8c73b1d9a77c46c1f88e83254ec4bcb821
Author: Scott Hendricks <[email protected]>
AuthorDate: Thu Sep 5 16:39:26 2024 -0400

    MINOR: Fix Trogdor Off-By-One Errors. (#17095)
    
    Upon further inspection of the ConfigurableProducerWorker, I noticed that 
there is an off-by-one error that can cause us to greatly exceed the target 
messages per second.
    
    I created a test harness so I could quickly evaluate with and without this 
change.
    
    With this change, the test harness outputs sane values:
    
    GaussianThroughputGenerator throttle = new GaussianThroughputGenerator(350, 
35, 100, 100);
    ... Output:
    Changing Throttle: 323 ==> 318
    Rate: 3513 Messages / Second
    Changing Throttle: 318 ==> 318
    Rate: 3510 Messages / Second
    Changing Throttle: 318 ==> 333
    Rate: 3506 Messages / Second
    Changing Throttle: 333 ==> 352
    Rate: 3505 Messages / Second
    Changing Throttle: 352 ==> 356
    Rate: 3505 Messages / Second
    Changing Throttle: 356 ==> 302
    Rate: 3505 Messages / Second
    Changing Throttle: 302 ==> 347
    Rate: 3501 Messages / Second
    Changing Throttle: 347 ==> 397
    Rate: 3501 Messages / Second
    Without this change, the throttle thrashes, the values can skyrocket, and 
unintentional code paths can be called.
    
    GaussianThroughputGenerator throttle = new GaussianThroughputGenerator(350, 
35, 100, 100);
    ... Output:
    Changing Throttle: 374 ==> 314
    Changing Throttle: 314 ==> 346
    Changing Throttle: 346 ==> 340
    Changing Throttle: 340 ==> 382
    Changing Throttle: 382 ==> 377
    Changing Throttle: 377 ==> 352
    Changing Throttle: 352 ==> 397
    Changing Throttle: 397 ==> 335
    Rate: 4468 Messages / Second
    Changing Throttle: 335 ==> 398
    Changing Throttle: 398 ==> 345
    Changing Throttle: 345 ==> 381
    Changing Throttle: 381 ==> 334
    Changing Throttle: 334 ==> 303
    Changing Throttle: 303 ==> 359
    Changing Throttle: 359 ==> 353
    Changing Throttle: 353 ==> 422
    Changing Throttle: 422 ==> 274
    Changing Throttle: 274 ==> 317
    Rate: 4733 Messages / Second
    Changing Throttle: 317 ==> 316
    Changing Throttle: 316 ==> 392
    Changing Throttle: 392 ==> 342
    Changing Throttle: 342 ==> 429
    Changing Throttle: 429 ==> 305
    Changing Throttle: 305 ==> 389
    
    Reviewers: Justine Olshan <[email protected]>
---
 .../apache/kafka/trogdor/workload/GaussianThroughputGenerator.java | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
index 5e064f58e41..2c00104fef0 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
@@ -111,7 +111,7 @@ public class GaussianThroughputGenerator implements 
ThroughputGenerator {
         // Calculate the next window start time.
         long now = Time.SYSTEM.milliseconds();
         if (nextWindowStarts > 0) {
-            while (nextWindowStarts < now) {
+            while (nextWindowStarts <= now) {
                 nextWindowStarts += windowSizeMs;
             }
         } else {
@@ -119,7 +119,7 @@ public class GaussianThroughputGenerator implements 
ThroughputGenerator {
         }
 
         // Check the windows between rate changes.
-        if ((windowTracker > windowsUntilRateChange) || force) {
+        if ((windowTracker >= windowsUntilRateChange) || force) {
             windowTracker = 0;
 
             // Calculate the number of messages allowed in this window using a 
normal distribution.
@@ -146,6 +146,9 @@ public class GaussianThroughputGenerator implements 
ThroughputGenerator {
             while (nextWindowStarts > Time.SYSTEM.milliseconds()) {
                 wait(nextWindowStarts - Time.SYSTEM.milliseconds());
             }
+
+            // Calculate the next window now.
+            calculateNextWindow(false);
         }
     }
 }

Reply via email to