This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 50ca2c8c73b MINOR: Fix Trogdor Off-By-One Errors. (#17095)
50ca2c8c73b is described below
commit 50ca2c8c73b1d9a77c46c1f88e83254ec4bcb821
Author: Scott Hendricks <[email protected]>
AuthorDate: Thu Sep 5 16:39:26 2024 -0400
MINOR: Fix Trogdor Off-By-One Errors. (#17095)
Upon further inspection of the ConfigurableProducerWorker, I noticed that
there is an off-by-one error that can cause us to greatly exceed the target
messages per second.
I created a test harness so I could quickly evaluate with and without this
change.
With this change, the test harness outputs sane values:
GaussianThroughputGenerator throttle = new GaussianThroughputGenerator(350,
35, 100, 100);
... Output:
Changing Throttle: 323 ==> 318
Rate: 3513 Messages / Second
Changing Throttle: 318 ==> 318
Rate: 3510 Messages / Second
Changing Throttle: 318 ==> 333
Rate: 3506 Messages / Second
Changing Throttle: 333 ==> 352
Rate: 3505 Messages / Second
Changing Throttle: 352 ==> 356
Rate: 3505 Messages / Second
Changing Throttle: 356 ==> 302
Rate: 3505 Messages / Second
Changing Throttle: 302 ==> 347
Rate: 3501 Messages / Second
Changing Throttle: 347 ==> 397
Rate: 3501 Messages / Second
Without this change, the throttle thrashes, the values can skyrocket, and
unintentional code paths can be called.
GaussianThroughputGenerator throttle = new GaussianThroughputGenerator(350,
35, 100, 100);
... Output:
Changing Throttle: 374 ==> 314
Changing Throttle: 314 ==> 346
Changing Throttle: 346 ==> 340
Changing Throttle: 340 ==> 382
Changing Throttle: 382 ==> 377
Changing Throttle: 377 ==> 352
Changing Throttle: 352 ==> 397
Changing Throttle: 397 ==> 335
Rate: 4468 Messages / Second
Changing Throttle: 335 ==> 398
Changing Throttle: 398 ==> 345
Changing Throttle: 345 ==> 381
Changing Throttle: 381 ==> 334
Changing Throttle: 334 ==> 303
Changing Throttle: 303 ==> 359
Changing Throttle: 359 ==> 353
Changing Throttle: 353 ==> 422
Changing Throttle: 422 ==> 274
Changing Throttle: 274 ==> 317
Rate: 4733 Messages / Second
Changing Throttle: 317 ==> 316
Changing Throttle: 316 ==> 392
Changing Throttle: 392 ==> 342
Changing Throttle: 342 ==> 429
Changing Throttle: 429 ==> 305
Changing Throttle: 305 ==> 389
Reviewers: Justine Olshan <[email protected]>
---
.../apache/kafka/trogdor/workload/GaussianThroughputGenerator.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
index 5e064f58e41..2c00104fef0 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
@@ -111,7 +111,7 @@ public class GaussianThroughputGenerator implements
ThroughputGenerator {
// Calculate the next window start time.
long now = Time.SYSTEM.milliseconds();
if (nextWindowStarts > 0) {
- while (nextWindowStarts < now) {
+ while (nextWindowStarts <= now) {
nextWindowStarts += windowSizeMs;
}
} else {
@@ -119,7 +119,7 @@ public class GaussianThroughputGenerator implements
ThroughputGenerator {
}
// Check the windows between rate changes.
- if ((windowTracker > windowsUntilRateChange) || force) {
+ if ((windowTracker >= windowsUntilRateChange) || force) {
windowTracker = 0;
// Calculate the number of messages allowed in this window using a
normal distribution.
@@ -146,6 +146,9 @@ public class GaussianThroughputGenerator implements
ThroughputGenerator {
while (nextWindowStarts > Time.SYSTEM.milliseconds()) {
wait(nextWindowStarts - Time.SYSTEM.milliseconds());
}
+
+ // Calculate the next window now.
+ calculateNextWindow(false);
}
}
}