This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e84e3c610233b56f68b9417f8dfc35d185b941ad Author: Matteo Merli <[email protected]> AuthorDate: Thu Mar 19 18:08:38 2026 -0700 [fix][test] Fix flaky MessagePublishBufferThrottleTest.testBlockByPublishRateLimiting (#25365) (cherry picked from commit a3ae70545f3aef40e8f5cfcced907e6500b08894) --- .../service/MessagePublishBufferThrottleTest.java | 60 ++++++++++++++-------- 1 file changed, 39 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java index 0faae14da08..6a74df3eecc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java @@ -20,10 +20,9 @@ package org.apache.pulsar.broker.service; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; -import static org.testng.Assert.fail; -import java.util.concurrent.CompletableFuture; +import io.opentelemetry.sdk.metrics.data.MetricData; +import java.util.Collection; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.testcontext.PulsarTestContext; @@ -134,33 +133,32 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase { pulsarTestContext.getMockBookKeeper().addEntryDelay(5, TimeUnit.SECONDS); - // Block by publish buffer. + // Block by publish buffer: 10 x 1MB messages with a 1MB buffer limit. byte[] payload = new byte[1024 * 1024]; for (int i = 0; i < 10; i++) { producer.sendAsync(payload); } - Awaitility.await().untilAsserted(() -> assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 1)); + // Wait for at least one pause event to be recorded. + Awaitility.await().untilAsserted( + () -> assertRateLimitCounterAtLeast(ConnectionRateLimitOperationName.PAUSED, 1)); - CompletableFuture<Void> flushFuture = producer.flushAsync(); + // Verify that no resume has happened yet while messages are still blocked. + Awaitility.await().untilAsserted( + () -> assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0)); - // Block by publish rate. - // After 1 second, the message buffer throttling will be lifted, but the rate limiting will still be in place. - assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 1); - assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0); - - try { - flushFuture.get(2, TimeUnit.SECONDS); - fail("Should have timed out"); - } catch (TimeoutException e) { - // Ok - } - - flushFuture.join(); + // Flush and wait for all messages to complete. + producer.flush(); + // After all messages are sent, the number of pauses and resumes should match: + // every pause must eventually be followed by a resume. Awaitility.await().untilAsserted(() -> { - assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 10); - assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 10); + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + long pausedCount = getMetricLongSumValue(metrics, ConnectionRateLimitOperationName.PAUSED); + long resumedCount = getMetricLongSumValue(metrics, ConnectionRateLimitOperationName.RESUMED); + Assert.assertTrue(pausedCount > 0, "Expected at least one pause event"); + Assert.assertEquals(pausedCount, resumedCount, + "Paused and resumed counts should match after all messages are sent"); }); } @@ -206,4 +204,24 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase { connectionRateLimitState.attributes, expectedCount); } } + + private void assertRateLimitCounterAtLeast(ConnectionRateLimitOperationName connectionRateLimitState, + int minExpectedCount) { + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(metrics, BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME, + connectionRateLimitState.attributes, + actual -> assertThat(actual).isGreaterThanOrEqualTo(minExpectedCount)); + } + + private long getMetricLongSumValue(Collection<MetricData> metrics, + ConnectionRateLimitOperationName connectionRateLimitState) { + var attributesMap = connectionRateLimitState.attributes.asMap(); + return metrics.stream() + .filter(m -> m.getName().equals(BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME)) + .flatMap(m -> m.getLongSumData().getPoints().stream()) + .filter(point -> point.getAttributes().asMap().equals(attributesMap)) + .mapToLong(io.opentelemetry.sdk.metrics.data.LongPointData::getValue) + .findFirst() + .orElse(0L); + } }
