This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2d7ce2600f7fd162a7b287e5264150336e1917d5
Author: sinan liu <[email protected]>
AuthorDate: Fri Oct 17 19:33:22 2025 +0800

    [fix][test] Stabilize PublishRateLimiterOverconsumingTest by aligning 
measurement and using adjacent 2-sec averages (#24864)
    
    (cherry picked from commit 562d4463ecc5ac15cb0d9fbe380fd49664530a3e)
---
 .../PublishRateLimiterOverconsumingTest.java       | 72 +++++++++++++++-------
 1 file changed, 50 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterOverconsumingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterOverconsumingTest.java
index fe30e7c7873..866df645d3f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterOverconsumingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterOverconsumingTest.java
@@ -79,7 +79,8 @@ public class PublishRateLimiterOverconsumingTest extends 
BrokerTestBase {
         int durationSeconds = 5;
         int numberOfConsumers = 5;
         int numberOfProducersWithIndependentClients = 5;
-        int numberOfMessagesForEachProducer = (rateInMsg * (durationSeconds + 
1)) / 5;
+        int numberOfMessagesForEachProducer = (rateInMsg * (durationSeconds + 
1))
+                / numberOfProducersWithIndependentClients;
 
         // configure dispatch throttling rate
         BrokerService brokerService = pulsar.getBrokerService();
@@ -102,12 +103,12 @@ public class PublishRateLimiterOverconsumingTest extends 
BrokerTestBase {
         AtomicInteger lastCalculatedSecond = new AtomicInteger(0);
         List<Integer> collectedRatesForEachSecond = 
Collections.synchronizedList(new ArrayList<>());
 
-        // track actual consuming rate of messages per second
+        // rack actual consuming rate of messages per second
+        // (After aligning the first message, start counting whole seconds)
         Runnable rateTracker = () -> {
             long startTime = startTimeNanos.get();
             if (startTime == 0) {
-                startTimeNanos.compareAndSet(0, System.nanoTime());
-                startTime = startTimeNanos.get();
+                return;
             }
             long durationNanos = System.nanoTime() - startTime;
             int elapsedFullSeconds = (int) (durationNanos / 1e9);
@@ -124,10 +125,15 @@ public class PublishRateLimiterOverconsumingTest extends 
BrokerTestBase {
         ScheduledFuture<?> scheduledFuture = 
executor.scheduleAtFixedRate(rateTracker, 0, 500, TimeUnit.MILLISECONDS);
 
         // message listener implementation used for all consumers
+        // Set startTime when the first message arrives; then accumulate the 
counter
         MessageListener<Integer> messageListener = new MessageListener<>() {
             @Override
             public void received(Consumer<Integer> consumer, Message<Integer> 
msg) {
-                lastReceivedMessageTimeNanos.set(System.nanoTime());
+                long now = System.nanoTime();
+                if (startTimeNanos.get() == 0) {
+                    startTimeNanos.compareAndSet(0, now);
+                }
+                lastReceivedMessageTimeNanos.set(now);
                 currentSecondMessagesCount.incrementAndGet();
                 totalMessagesReceived.incrementAndGet();
                 consumer.acknowledgeAsync(msg);
@@ -233,33 +239,55 @@ public class PublishRateLimiterOverconsumingTest extends 
BrokerTestBase {
             });
         };
 
-        // wait for results
+        // Wait for all messages to be consumed
         Awaitility.await()
                 .atMost(Duration.ofSeconds(durationSeconds * 2))
                 .pollInterval(Duration.ofMillis(100))
-                .untilAsserted(
-                        () -> 
assertThat(collectedRatesForEachSecond).hasSizeGreaterThanOrEqualTo(durationSeconds));
+                .untilAsserted(() ->
+                        assertThat(totalMessagesReceived.get())
+                                
.isEqualTo(numberOfProducersWithIndependentClients * 
numberOfMessagesForEachProducer));
+
+        // Collect per-second windows, and add the last half-second remainder
         List<Integer> collectedRatesSnapshot = new 
ArrayList<>(collectedRatesForEachSecond);
+        int tail = currentSecondMessagesCount.getAndSet(0);
+        if (tail > 0) {
+            collectedRatesSnapshot.add(tail);
+        }
         log.info("Collected rates for each second: {}", 
collectedRatesSnapshot);
-        long avgMsgRate =
-                totalMessagesReceived.get() / TimeUnit.NANOSECONDS.toSeconds(
-                        lastReceivedMessageTimeNanos.get() - 
startTimeNanos.get());
-        log.info("Average rate during the test run: {} msg/s", avgMsgRate);
+
+        // Calculate the average using second-by-second windows:
+        // Skip the first second, take up to durationSeconds windows.
+        int usable = Math.min(durationSeconds, Math.max(0, 
collectedRatesSnapshot.size() - 1));
+        double windowedAvg = (usable > 0)
+                ? collectedRatesSnapshot.subList(1, 1 + 
usable).stream().mapToInt(Integer::intValue).average().orElse(0)
+                : 0;
 
         assertSoftly(softly -> {
-            // check the rate during the test run
-            softly.assertThat(avgMsgRate).describedAs("average rate during the 
test run")
-                    // allow rate in 40% range
+            // Overall average (window mean, ±40%)
+            softly.assertThat(windowedAvg)
+                    .describedAs("windowed average rate during the test run")
                     .isCloseTo(rateInMsg, Percentage.withPercentage(40));
 
-            // check that rates were collected
-            // skip the first element as it might contain messages for first 2 
seconds
-            softly.assertThat(collectedRatesSnapshot.subList(1, 
collectedRatesSnapshot.size() - 1))
-                    .describedAs("actual rates for each second")
-                    .allSatisfy(rates -> {
-                        assertThat(rates).describedAs("actual rate for each 
second")
+            // Per second (average of two adjacent seconds, skip first/last 
pairs, ±55%)
+            if (collectedRatesSnapshot.size() >= 4) {
+                // Starting from (1, 2), ending at (size-2, size-1)
+                for (int i = 2; i < collectedRatesSnapshot.size(); i++) {
+                    int avg2 = (collectedRatesSnapshot.get(i - 1) + 
collectedRatesSnapshot.get(i)) / 2;
+                    softly.assertThat(avg2)
+                            .describedAs("Average of second %d and %d", i, i + 
1)
+                            .isCloseTo(rateInMsg, 
Percentage.withPercentage(55));
+                }
+            } else {
+                // Degenerates to: Skip head and tail with 50% tolerance 
(avoid false positives)
+                // when there are too few windows
+                if (collectedRatesSnapshot.size() > 2) {
+                    for (int i = 1; i < collectedRatesSnapshot.size() - 1; 
i++) {
+                        softly.assertThat(collectedRatesSnapshot.get(i))
+                                .describedAs("rate of second %d (fallback 
check)", i + 1)
                                 .isCloseTo(rateInMsg, 
Percentage.withPercentage(50));
-                    });
+                    }
+                }
+            }
         });
         scheduledFuture.cancel(true);
         producersClose.close();

Reply via email to