This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 562d4463ecc [fix][test] Stabilize PublishRateLimiterOverconsumingTest
by aligning measurement and using adjacent 2-sec averages (#24864)
562d4463ecc is described below
commit 562d4463ecc5ac15cb0d9fbe380fd49664530a3e
Author: sinan liu <[email protected]>
AuthorDate: Fri Oct 17 19:33:22 2025 +0800
[fix][test] Stabilize PublishRateLimiterOverconsumingTest by aligning
measurement and using adjacent 2-sec averages (#24864)
---
.../PublishRateLimiterOverconsumingTest.java | 72 +++++++++++++++-------
1 file changed, 50 insertions(+), 22 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterOverconsumingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterOverconsumingTest.java
index fe30e7c7873..866df645d3f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterOverconsumingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterOverconsumingTest.java
@@ -79,7 +79,8 @@ public class PublishRateLimiterOverconsumingTest extends
BrokerTestBase {
int durationSeconds = 5;
int numberOfConsumers = 5;
int numberOfProducersWithIndependentClients = 5;
- int numberOfMessagesForEachProducer = (rateInMsg * (durationSeconds +
1)) / 5;
+ int numberOfMessagesForEachProducer = (rateInMsg * (durationSeconds +
1))
+ / numberOfProducersWithIndependentClients;
// configure dispatch throttling rate
BrokerService brokerService = pulsar.getBrokerService();
@@ -102,12 +103,12 @@ public class PublishRateLimiterOverconsumingTest extends
BrokerTestBase {
AtomicInteger lastCalculatedSecond = new AtomicInteger(0);
List<Integer> collectedRatesForEachSecond =
Collections.synchronizedList(new ArrayList<>());
- // track actual consuming rate of messages per second
+ // rack actual consuming rate of messages per second
+ // (After aligning the first message, start counting whole seconds)
Runnable rateTracker = () -> {
long startTime = startTimeNanos.get();
if (startTime == 0) {
- startTimeNanos.compareAndSet(0, System.nanoTime());
- startTime = startTimeNanos.get();
+ return;
}
long durationNanos = System.nanoTime() - startTime;
int elapsedFullSeconds = (int) (durationNanos / 1e9);
@@ -124,10 +125,15 @@ public class PublishRateLimiterOverconsumingTest extends
BrokerTestBase {
ScheduledFuture<?> scheduledFuture =
executor.scheduleAtFixedRate(rateTracker, 0, 500, TimeUnit.MILLISECONDS);
// message listener implementation used for all consumers
+ // Set startTime when the first message arrives; then accumulate the
counter
MessageListener<Integer> messageListener = new MessageListener<>() {
@Override
public void received(Consumer<Integer> consumer, Message<Integer>
msg) {
- lastReceivedMessageTimeNanos.set(System.nanoTime());
+ long now = System.nanoTime();
+ if (startTimeNanos.get() == 0) {
+ startTimeNanos.compareAndSet(0, now);
+ }
+ lastReceivedMessageTimeNanos.set(now);
currentSecondMessagesCount.incrementAndGet();
totalMessagesReceived.incrementAndGet();
consumer.acknowledgeAsync(msg);
@@ -233,33 +239,55 @@ public class PublishRateLimiterOverconsumingTest extends
BrokerTestBase {
});
};
- // wait for results
+ // Wait for all messages to be consumed
Awaitility.await()
.atMost(Duration.ofSeconds(durationSeconds * 2))
.pollInterval(Duration.ofMillis(100))
- .untilAsserted(
- () ->
assertThat(collectedRatesForEachSecond).hasSizeGreaterThanOrEqualTo(durationSeconds));
+ .untilAsserted(() ->
+ assertThat(totalMessagesReceived.get())
+
.isEqualTo(numberOfProducersWithIndependentClients *
numberOfMessagesForEachProducer));
+
+ // Collect per-second windows, and add the last half-second remainder
List<Integer> collectedRatesSnapshot = new
ArrayList<>(collectedRatesForEachSecond);
+ int tail = currentSecondMessagesCount.getAndSet(0);
+ if (tail > 0) {
+ collectedRatesSnapshot.add(tail);
+ }
log.info("Collected rates for each second: {}",
collectedRatesSnapshot);
- long avgMsgRate =
- totalMessagesReceived.get() / TimeUnit.NANOSECONDS.toSeconds(
- lastReceivedMessageTimeNanos.get() -
startTimeNanos.get());
- log.info("Average rate during the test run: {} msg/s", avgMsgRate);
+
+ // Calculate the average using second-by-second windows:
+ // Skip the first second, take up to durationSeconds windows.
+ int usable = Math.min(durationSeconds, Math.max(0,
collectedRatesSnapshot.size() - 1));
+ double windowedAvg = (usable > 0)
+ ? collectedRatesSnapshot.subList(1, 1 +
usable).stream().mapToInt(Integer::intValue).average().orElse(0)
+ : 0;
assertSoftly(softly -> {
- // check the rate during the test run
- softly.assertThat(avgMsgRate).describedAs("average rate during the
test run")
- // allow rate in 40% range
+ // Overall average (window mean, ±40%)
+ softly.assertThat(windowedAvg)
+ .describedAs("windowed average rate during the test run")
.isCloseTo(rateInMsg, Percentage.withPercentage(40));
- // check that rates were collected
- // skip the first element as it might contain messages for first 2
seconds
- softly.assertThat(collectedRatesSnapshot.subList(1,
collectedRatesSnapshot.size() - 1))
- .describedAs("actual rates for each second")
- .allSatisfy(rates -> {
- assertThat(rates).describedAs("actual rate for each
second")
+ // Per second (average of two adjacent seconds, skip first/last
pairs, ±55%)
+ if (collectedRatesSnapshot.size() >= 4) {
+ // Starting from (1, 2), ending at (size-2, size-1)
+ for (int i = 2; i < collectedRatesSnapshot.size(); i++) {
+ int avg2 = (collectedRatesSnapshot.get(i - 1) +
collectedRatesSnapshot.get(i)) / 2;
+ softly.assertThat(avg2)
+ .describedAs("Average of second %d and %d", i, i +
1)
+ .isCloseTo(rateInMsg,
Percentage.withPercentage(55));
+ }
+ } else {
+ // Degenerates to: Skip head and tail with 50% tolerance
(avoid false positives)
+ // when there are too few windows
+ if (collectedRatesSnapshot.size() > 2) {
+ for (int i = 1; i < collectedRatesSnapshot.size() - 1;
i++) {
+ softly.assertThat(collectedRatesSnapshot.get(i))
+ .describedAs("rate of second %d (fallback
check)", i + 1)
.isCloseTo(rateInMsg,
Percentage.withPercentage(50));
- });
+ }
+ }
+ }
});
scheduledFuture.cancel(true);
producersClose.close();