This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e84e3c610233b56f68b9417f8dfc35d185b941ad
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 19 18:08:38 2026 -0700

    [fix][test] Fix flaky 
MessagePublishBufferThrottleTest.testBlockByPublishRateLimiting (#25365)
    
    (cherry picked from commit a3ae70545f3aef40e8f5cfcced907e6500b08894)
---
 .../service/MessagePublishBufferThrottleTest.java  | 60 ++++++++++++++--------
 1 file changed, 39 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
index 0faae14da08..6a74df3eecc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
@@ -20,10 +20,9 @@ package org.apache.pulsar.broker.service;
 
 import static 
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
 import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
-import static org.testng.Assert.fail;
-import java.util.concurrent.CompletableFuture;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
@@ -134,33 +133,32 @@ public class MessagePublishBufferThrottleTest extends 
BrokerTestBase {
 
         pulsarTestContext.getMockBookKeeper().addEntryDelay(5, 
TimeUnit.SECONDS);
 
-        // Block by publish buffer.
+        // Block by publish buffer: 10 x 1MB messages with a 1MB buffer limit.
         byte[] payload = new byte[1024 * 1024];
         for (int i = 0; i < 10; i++) {
             producer.sendAsync(payload);
         }
 
-        Awaitility.await().untilAsserted(() -> 
assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 1));
+        // Wait for at least one pause event to be recorded.
+        Awaitility.await().untilAsserted(
+                () -> 
assertRateLimitCounterAtLeast(ConnectionRateLimitOperationName.PAUSED, 1));
 
-        CompletableFuture<Void> flushFuture = producer.flushAsync();
+        // Verify that no resume has happened yet while messages are still 
blocked.
+        Awaitility.await().untilAsserted(
+                () -> 
assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0));
 
-        // Block by publish rate.
-        // After 1 second, the message buffer throttling will be lifted, but 
the rate limiting will still be in place.
-        assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 1);
-        assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0);
-
-        try {
-            flushFuture.get(2, TimeUnit.SECONDS);
-            fail("Should have timed out");
-        } catch (TimeoutException e) {
-            // Ok
-        }
-
-        flushFuture.join();
+        // Flush and wait for all messages to complete.
+        producer.flush();
 
+        // After all messages are sent, the number of pauses and resumes 
should match:
+        // every pause must eventually be followed by a resume.
         Awaitility.await().untilAsserted(() -> {
-            assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 
10);
-            assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 
10);
+            var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+            long pausedCount = getMetricLongSumValue(metrics, 
ConnectionRateLimitOperationName.PAUSED);
+            long resumedCount = getMetricLongSumValue(metrics, 
ConnectionRateLimitOperationName.RESUMED);
+            Assert.assertTrue(pausedCount > 0, "Expected at least one pause 
event");
+            Assert.assertEquals(pausedCount, resumedCount,
+                    "Paused and resumed counts should match after all messages 
are sent");
         });
     }
 
@@ -206,4 +204,24 @@ public class MessagePublishBufferThrottleTest extends 
BrokerTestBase {
                     connectionRateLimitState.attributes, expectedCount);
         }
     }
+
+    private void 
assertRateLimitCounterAtLeast(ConnectionRateLimitOperationName 
connectionRateLimitState,
+                                               int minExpectedCount) {
+        var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricLongSumValue(metrics, 
BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME,
+                connectionRateLimitState.attributes,
+                actual -> 
assertThat(actual).isGreaterThanOrEqualTo(minExpectedCount));
+    }
+
+    private long getMetricLongSumValue(Collection<MetricData> metrics,
+                                       ConnectionRateLimitOperationName 
connectionRateLimitState) {
+        var attributesMap = connectionRateLimitState.attributes.asMap();
+        return metrics.stream()
+                .filter(m -> 
m.getName().equals(BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME))
+                .flatMap(m -> m.getLongSumData().getPoints().stream())
+                .filter(point -> 
point.getAttributes().asMap().equals(attributesMap))
+                
.mapToLong(io.opentelemetry.sdk.metrics.data.LongPointData::getValue)
+                .findFirst()
+                .orElse(0L);
+    }
 }

Reply via email to