This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2d7ce2600f7fd162a7b287e5264150336e1917d5 Author: sinan liu <[email protected]> AuthorDate: Fri Oct 17 19:33:22 2025 +0800 [fix][test] Stabilize PublishRateLimiterOverconsumingTest by aligning measurement and using adjacent 2-sec averages (#24864) (cherry picked from commit 562d4463ecc5ac15cb0d9fbe380fd49664530a3e) --- .../PublishRateLimiterOverconsumingTest.java | 72 +++++++++++++++------- 1 file changed, 50 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterOverconsumingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterOverconsumingTest.java index fe30e7c7873..866df645d3f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterOverconsumingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterOverconsumingTest.java @@ -79,7 +79,8 @@ public class PublishRateLimiterOverconsumingTest extends BrokerTestBase { int durationSeconds = 5; int numberOfConsumers = 5; int numberOfProducersWithIndependentClients = 5; - int numberOfMessagesForEachProducer = (rateInMsg * (durationSeconds + 1)) / 5; + int numberOfMessagesForEachProducer = (rateInMsg * (durationSeconds + 1)) + / numberOfProducersWithIndependentClients; // configure dispatch throttling rate BrokerService brokerService = pulsar.getBrokerService(); @@ -102,12 +103,12 @@ public class PublishRateLimiterOverconsumingTest extends BrokerTestBase { AtomicInteger lastCalculatedSecond = new AtomicInteger(0); List<Integer> collectedRatesForEachSecond = Collections.synchronizedList(new ArrayList<>()); - // track actual consuming rate of messages per second + // rack actual consuming rate of messages per second + // (After aligning the first message, start counting whole seconds) Runnable rateTracker = () -> { long startTime = startTimeNanos.get(); if (startTime == 0) { - startTimeNanos.compareAndSet(0, System.nanoTime()); - startTime = startTimeNanos.get(); + return; } long durationNanos = System.nanoTime() - startTime; int elapsedFullSeconds = (int) (durationNanos / 1e9); @@ -124,10 +125,15 @@ public class PublishRateLimiterOverconsumingTest extends BrokerTestBase { ScheduledFuture<?> scheduledFuture = executor.scheduleAtFixedRate(rateTracker, 0, 500, TimeUnit.MILLISECONDS); // message listener implementation used for all consumers + // Set startTime when the first message arrives; then accumulate the counter MessageListener<Integer> messageListener = new MessageListener<>() { @Override public void received(Consumer<Integer> consumer, Message<Integer> msg) { - lastReceivedMessageTimeNanos.set(System.nanoTime()); + long now = System.nanoTime(); + if (startTimeNanos.get() == 0) { + startTimeNanos.compareAndSet(0, now); + } + lastReceivedMessageTimeNanos.set(now); currentSecondMessagesCount.incrementAndGet(); totalMessagesReceived.incrementAndGet(); consumer.acknowledgeAsync(msg); @@ -233,33 +239,55 @@ public class PublishRateLimiterOverconsumingTest extends BrokerTestBase { }); }; - // wait for results + // Wait for all messages to be consumed Awaitility.await() .atMost(Duration.ofSeconds(durationSeconds * 2)) .pollInterval(Duration.ofMillis(100)) - .untilAsserted( - () -> assertThat(collectedRatesForEachSecond).hasSizeGreaterThanOrEqualTo(durationSeconds)); + .untilAsserted(() -> + assertThat(totalMessagesReceived.get()) + .isEqualTo(numberOfProducersWithIndependentClients * numberOfMessagesForEachProducer)); + + // Collect per-second windows, and add the last half-second remainder List<Integer> collectedRatesSnapshot = new ArrayList<>(collectedRatesForEachSecond); + int tail = currentSecondMessagesCount.getAndSet(0); + if (tail > 0) { + collectedRatesSnapshot.add(tail); + } log.info("Collected rates for each second: {}", collectedRatesSnapshot); - long avgMsgRate = - totalMessagesReceived.get() / TimeUnit.NANOSECONDS.toSeconds( - lastReceivedMessageTimeNanos.get() - startTimeNanos.get()); - log.info("Average rate during the test run: {} msg/s", avgMsgRate); + + // Calculate the average using second-by-second windows: + // Skip the first second, take up to durationSeconds windows. + int usable = Math.min(durationSeconds, Math.max(0, collectedRatesSnapshot.size() - 1)); + double windowedAvg = (usable > 0) + ? collectedRatesSnapshot.subList(1, 1 + usable).stream().mapToInt(Integer::intValue).average().orElse(0) + : 0; assertSoftly(softly -> { - // check the rate during the test run - softly.assertThat(avgMsgRate).describedAs("average rate during the test run") - // allow rate in 40% range + // Overall average (window mean, ±40%) + softly.assertThat(windowedAvg) + .describedAs("windowed average rate during the test run") .isCloseTo(rateInMsg, Percentage.withPercentage(40)); - // check that rates were collected - // skip the first element as it might contain messages for first 2 seconds - softly.assertThat(collectedRatesSnapshot.subList(1, collectedRatesSnapshot.size() - 1)) - .describedAs("actual rates for each second") - .allSatisfy(rates -> { - assertThat(rates).describedAs("actual rate for each second") + // Per second (average of two adjacent seconds, skip first/last pairs, ±55%) + if (collectedRatesSnapshot.size() >= 4) { + // Starting from (1, 2), ending at (size-2, size-1) + for (int i = 2; i < collectedRatesSnapshot.size(); i++) { + int avg2 = (collectedRatesSnapshot.get(i - 1) + collectedRatesSnapshot.get(i)) / 2; + softly.assertThat(avg2) + .describedAs("Average of second %d and %d", i, i + 1) + .isCloseTo(rateInMsg, Percentage.withPercentage(55)); + } + } else { + // Degenerates to: Skip head and tail with 50% tolerance (avoid false positives) + // when there are too few windows + if (collectedRatesSnapshot.size() > 2) { + for (int i = 1; i < collectedRatesSnapshot.size() - 1; i++) { + softly.assertThat(collectedRatesSnapshot.get(i)) + .describedAs("rate of second %d (fallback check)", i + 1) .isCloseTo(rateInMsg, Percentage.withPercentage(50)); - }); + } + } + } }); scheduledFuture.cancel(true); producersClose.close();
