This is an automated email from the ASF dual-hosted git repository.
cbornet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 0d574cc Reduce flakiness for shouldSpreadRequestsEvenlyAcrossUpstream
(#92)
0d574cc is described below
commit 0d574cc753ad0fe390459d2f884223be6a617ab1
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Dec 7 17:22:25 2022 +0200
Reduce flakiness for shouldSpreadRequestsEvenlyAcrossUpstream (#92)
---
.../pulsar/reactive/client/internal/api/InflightLimiterTest.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiterTest.java
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiterTest.java
index 6e5478d..dac3e1e 100644
---
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiterTest.java
+++
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiterTest.java
@@ -169,12 +169,12 @@ class InflightLimiterTest {
@Test
void shouldSpreadRequestsEvenlyAcrossUpstream() {
- InflightLimiter inflightLimiter = new InflightLimiter(1, 1,
Schedulers.single(),
+ InflightLimiter inflightLimiter = new InflightLimiter(1, 0,
Schedulers.single(),
InflightLimiter.DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
List<Integer> values = Flux
- .merge(100, Flux.range(1,
100).delayElements(Duration.ofMillis(1)).as(inflightLimiter::createOperator),
- Flux.range(101,
100).delayElements(Duration.ofMillis(1)).as(inflightLimiter::createOperator),
- Flux.range(201,
100).delayElements(Duration.ofMillis(1)).as(inflightLimiter::createOperator))
+ .merge(100, Flux.range(1,
100).delayElements(Duration.ofMillis(3)).as(inflightLimiter::createOperator),
+ Flux.range(101,
100).delayElements(Duration.ofMillis(3)).as(inflightLimiter::createOperator),
+ Flux.range(201,
100).delayElements(Duration.ofMillis(3)).as(inflightLimiter::createOperator))
.collectList().block();
assertThat(values).containsExactlyInAnyOrderElementsOf(
IntStream.rangeClosed(1,
300).boxed().collect(Collectors.toList()));