This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 6af9aca Fix bugs in InflightLimiter and add tests (#83)
6af9aca is described below
commit 6af9aca9369d21b5d750fd4a319e06e5285f0f30
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Dec 7 12:43:29 2022 +0200
Fix bugs in InflightLimiter and add tests (#83)
* Add tests for InflightLimiter
* Optimize maybeTriggerNext
* Prevent requestedDemand overflow when Long.MAX_VALUE is requested
* Fix maxRequest calculation
* Remove obsolete statements
* Reduce flakiness (3 seconds might not be sufficient)
---
.../client/internal/api/InflightLimiter.java | 84 ++++++----
.../client/api/ReactiveMessagePipelineTest.java | 2 +-
.../client/internal/api/InflightLimiterTest.java | 180 +++++++++++++++++----
3 files changed, 204 insertions(+), 62 deletions(-)
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiter.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiter.java
index 82a6fc2..feac6e6 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiter.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiter.java
@@ -16,6 +16,7 @@
package org.apache.pulsar.reactive.client.internal.api;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -36,6 +37,14 @@ import reactor.util.context.Context;
/**
* Transformer class that limits the number of reactive streams subscription
requests to
* keep the number of pending messages under a defined limit.
+ *
+ * Subscribing to upstream is postponed if the max inflight limit is reached
since
+ * subscribing to a CompletableFuture is eager. The CompetableFuture will be
created at
+ * subscription time and this demand cannot be controlled with Reactive
Stream's requests.
+ * The solution for this is to acquire one slot at subscription time and
return this slot
+ * when request is made to the subscription. Since it's not possible to
backpressure the
+ * subscription requests, there's a configurable limit for the total number of
pending
+ * subscriptions. Exceeding the limit will cause IllegalStateException at
runtime.
*/
public class InflightLimiter implements PublisherTransformer {
@@ -54,24 +63,32 @@ public class InflightLimiter implements
PublisherTransformer {
private final Scheduler.Worker triggerNextWorker;
+ private final AtomicBoolean triggerNextTriggered = new
AtomicBoolean(false);
+
/**
* Constructs an InflightLimiter with a maximum number of in-flight
messages.
* @param maxInflight the maximum number of in-flight messages
*/
public InflightLimiter(int maxInflight) {
- this(maxInflight, maxInflight, Schedulers.single(),
DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
+ this(maxInflight, 0, Schedulers.single(),
DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
}
/**
* Constructs an InflightLimiter.
* @param maxInflight the maximum number of in-flight messages
- * @param expectedSubscriptionsInflight the expected number of
in-flight subscriptions
+ * @param expectedSubscriptionsInflight the expected number of in-flight
+ * subscriptions. Will limit the per-subscription requests to
maxInflight / max(active
+ * subscriptions, expectedSubscriptionsInflight). Set to 0 to use
active subscriptions
+ * in the calculation.
* @param triggerNextScheduler the scheduler on which it will be
checked if the
* subscriber can request more
* @param maxPendingSubscriptions the maximum number of pending
subscriptions
*/
public InflightLimiter(int maxInflight, int
expectedSubscriptionsInflight, Scheduler triggerNextScheduler,
int maxPendingSubscriptions) {
+ if (maxInflight < 1) {
+ throw new IllegalArgumentException("maxInflight must be
greater than 0");
+ }
this.maxInflight = maxInflight;
this.expectedSubscriptionsInflight =
expectedSubscriptionsInflight;
this.triggerNextWorker = triggerNextScheduler.createWorker();
@@ -116,21 +133,25 @@ public class InflightLimiter implements
PublisherTransformer {
}
void maybeTriggerNext() {
- if (!this.triggerNextWorker.isDisposed()) {
- this.triggerNextWorker.schedule(() -> {
- int remainingSubscriptions =
this.pendingSubscriptions.size();
- while (this.inflight.get() < this.maxInflight
&& remainingSubscriptions-- > 0) {
- InflightLimiterSubscriber<?> subscriber
= this.pendingSubscriptions.poll();
- if (subscriber != null) {
- if (!subscriber.isDisposed()) {
-
subscriber.requestMore();
+ if (!this.triggerNextWorker.isDisposed() && this.inflight.get()
< this.maxInflight
+ && !this.pendingSubscriptions.isEmpty()) {
+ if (this.triggerNextTriggered.compareAndSet(false,
true)) {
+ this.triggerNextWorker.schedule(() -> {
+ this.triggerNextTriggered.set(false);
+ int remainingSubscriptions =
this.pendingSubscriptions.size();
+ while (this.inflight.get() <
this.maxInflight && remainingSubscriptions-- > 0) {
+ InflightLimiterSubscriber<?>
subscriber = this.pendingSubscriptions.poll();
+ if (subscriber != null) {
+ if
(!subscriber.isDisposed()) {
+
subscriber.requestMore();
+ }
+ }
+ else {
+ break;
}
}
- else {
- break;
- }
- }
- });
+ });
+ }
}
}
@@ -177,7 +198,12 @@ public class InflightLimiter implements
PublisherTransformer {
private final Subscription subscription = new Subscription() {
@Override
public void request(long n) {
-
InflightLimiterSubscriber.this.requestedDemand.addAndGet(n);
+ if (n == Long.MAX_VALUE) {
+
InflightLimiterSubscriber.this.requestedDemand.set(n);
+ }
+ else if
(InflightLimiterSubscriber.this.requestedDemand.get() != Long.MAX_VALUE) {
+
InflightLimiterSubscriber.this.requestedDemand.addAndGet(n);
+ }
maybeAddToPending();
maybeTriggerNext();
}
@@ -248,16 +274,20 @@ public class InflightLimiter implements
PublisherTransformer {
}
void requestMore() {
- if (this.state.get() ==
InflightLimiterSubscriberState.SUBSCRIBED || (this.requestedDemand.get() > 0
- && this.inflightForSubscription.get()
<= InflightLimiter.this.expectedSubscriptionsInflight / 2
- && InflightLimiter.this.inflight.get()
< InflightLimiter.this.maxInflight)) {
+ // spread requests evenly across active subscriptions
(or expected number of
+ // subscriptions)
+ int maxInflightForSubscription = Math
+ .max(InflightLimiter.this.maxInflight /
Math.max(InflightLimiter.this.activeSubscriptions.get(),
+
InflightLimiter.this.expectedSubscriptionsInflight), 1);
+ if (this.requestedDemand.get() > 0 && (this.state.get()
== InflightLimiterSubscriberState.SUBSCRIBED
+ || (this.inflightForSubscription.get()
< maxInflightForSubscription
+ &&
InflightLimiter.this.inflight.get() < InflightLimiter.this.maxInflight))) {
if
(this.state.compareAndSet(InflightLimiterSubscriberState.INITIAL,
InflightLimiterSubscriberState.SUBSCRIBING)) {
// consume one slot for the
subscription, since the first element
// might already be in flight
// when a CompletableFuture is mapped
to a Mono
InflightLimiter.this.inflight.incrementAndGet();
- this.requestedDemand.decrementAndGet();
this.inflightForSubscription.incrementAndGet();
this.source.subscribe(InflightLimiterSubscriber.this);
}
@@ -270,20 +300,10 @@ public class InflightLimiter implements
PublisherTransformer {
// reverse the slot reservation
made when transitioning from
// INITIAL to SUBSCRIBING
InflightLimiter.this.inflight.decrementAndGet();
-
this.requestedDemand.incrementAndGet();
this.inflightForSubscription.decrementAndGet();
}
- long maxRequest = Math
- .max(Math.min(
-
Math.min(
-
Math.min(this.requestedDemand.get(),
-
InflightLimiter.this.maxInflight
-
- InflightLimiter.this.inflight.get()),
-
InflightLimiter.this.expectedSubscriptionsInflight
-
- this.inflightForSubscription.get()),
-
InflightLimiter.this.maxInflight
-
/ Math.max(InflightLimiter.this.activeSubscriptions.get(), 1)),
- 1);
+ long maxRequest =
Math.max(Math.min(this.requestedDemand.get(),
+
maxInflightForSubscription - this.inflightForSubscription.get()), 1);
InflightLimiter.this.inflight.addAndGet((int) maxRequest);
this.requestedDemand.addAndGet(-maxRequest);
this.inflightForSubscription.addAndGet((int) maxRequest);
diff --git
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
index 36f68a2..c6ff416 100644
---
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
+++
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
@@ -318,7 +318,7 @@ class ReactiveMessagePipelineTest {
try (ReactiveMessagePipeline pipeline =
testConsumer.messagePipeline().messageHandler(messageHandler2)
.concurrency(1000).maxInflight(maxInFlight).build()) {
pipeline.start();
- assertThat(latch.await(3, TimeUnit.SECONDS)).isTrue();
+ assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(inflightCounter.getMax()).isEqualTo(maxInFlight);
}
}
diff --git
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiterTest.java
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiterTest.java
index 6de0ce7..6e5478d 100644
---
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiterTest.java
+++
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiterTest.java
@@ -16,50 +16,172 @@
package org.apache.pulsar.reactive.client.internal.api;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
+import java.time.Duration;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler.Worker;
import reactor.core.scheduler.Schedulers;
-import reactor.test.StepVerifier;
+import reactor.util.function.Tuple2;
import static org.assertj.core.api.Assertions.assertThat;
class InflightLimiterTest {
- @Test
- void shouldLimitInflight() {
- List<Integer> values = Collections.synchronizedList(new
ArrayList<>());
- InflightLimiter inflightLimiter = new InflightLimiter(48, 24,
Schedulers.single(),
+ @ParameterizedTest
+ @CsvSource({ "7,100", "13,100", "37,500", "51,1000" })
+ void shouldNotRequestOrSubscribeMoreThanMaxInflightForMonos(int
maxInflight, int maxElements) {
+ InflightLimiter inflightLimiter = new
InflightLimiter(maxInflight, maxInflight, Schedulers.single(),
InflightLimiter.DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
- Flux.merge(Arrays.asList(
- Flux.range(1,
100).publishOn(Schedulers.parallel()).log().as(inflightLimiter::createOperator),
- Flux.range(101,
100).publishOn(Schedulers.parallel()).log().as(inflightLimiter::createOperator),
- Flux.range(201,
100).publishOn(Schedulers.parallel()).log().as(inflightLimiter::createOperator)))
-
.as(StepVerifier::create).expectSubscription().recordWith(() ->
values).expectNextCount(300)
- .expectComplete().verify();
- assertThat(values)
-
.containsExactlyInAnyOrderElementsOf(IntStream.range(1,
301).boxed().collect(Collectors.toList()));
- // verify "fairness"
- // TODO: this is flaky, fix it
- // verifyFairness(values);
+
+ AtomicLong totalRequests = new AtomicLong();
+ AtomicLong requestsMax = new AtomicLong();
+ AtomicInteger subscriptionsActiveBeforeCompletingFirstElement =
new AtomicInteger();
+ AtomicInteger subscriptionsMax = new AtomicInteger();
+
+ List<Integer> inputValues = IntStream.rangeClosed(1,
maxElements).boxed().collect(Collectors.toList());
+
+ Worker worker = Schedulers.boundedElastic().createWorker();
+ try {
+ Flux<Integer> flux =
Flux.fromIterable(inputValues).flatMap((i) -> {
+ AtomicLong currentRequests = new AtomicLong();
+ return
Mono.delay(Duration.ofMillis(25L)).thenReturn(i).doOnSubscribe((subscription)
-> {
+ worker.schedule(() -> {
+ int currentSubscriptionsCount =
subscriptionsActiveBeforeCompletingFirstElement
+
.incrementAndGet();
+
subscriptionsMax.accumulateAndGet(currentSubscriptionsCount, Math::max);
+ });
+ }).doOnRequest((requested) -> {
+ worker.schedule(() -> {
+
currentRequests.addAndGet(requested);
+ long current =
totalRequests.addAndGet(requested);
+
requestsMax.accumulateAndGet(current, Math::max);
+ });
+ }).doOnNext((__) -> {
+ worker.schedule(() -> {
+
subscriptionsActiveBeforeCompletingFirstElement.decrementAndGet();
+ // Mono will complete after
this element, so reduce all remaining
+ // requests;
+
totalRequests.addAndGet(-currentRequests.getAndSet(0));
+ });
+
}).transform(inflightLimiter::transform).subscribeOn(Schedulers.single());
+ }, maxElements);
+ List<Integer> values = flux.collectList().block();
+ SoftAssertions.assertSoftly((softly) -> {
+
softly.assertThat(values).containsExactlyInAnyOrderElementsOf(inputValues);
+
softly.assertThat(requestsMax.get()).as("requestsMax").isEqualTo(maxInflight);
+
softly.assertThat(subscriptionsMax.get()).as("subscriptionsMax").isEqualTo(maxInflight);
+ });
+ }
+ finally {
+ worker.dispose();
+ }
+ }
+
+ @ParameterizedTest
+ @CsvSource({ "7,100", "13,100", "37,500", "51,1000" })
+ void shouldNotSubscribeMoreThanMaxInflightForMonos(int maxInflight, int
maxElements) {
+ InflightLimiter inflightLimiter = new
InflightLimiter(maxInflight, maxInflight, Schedulers.single(),
+
InflightLimiter.DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
+
+ AtomicInteger completableFuturesInflight = new AtomicInteger();
+ AtomicInteger completableFuturesInflightMax = new
AtomicInteger();
+
+ List<Integer> inputValues = IntStream.rangeClosed(1,
maxElements).boxed().collect(Collectors.toList());
+
+ Flux<Integer> flux = Flux.fromIterable(inputValues).flatMap((i)
-> Mono.fromCompletionStage(() -> {
+
completableFuturesInflightMax.accumulateAndGet(completableFuturesInflight.incrementAndGet(),
Math::max);
+ CompletableFuture<Integer> future = new
CompletableFuture<>();
+ Schedulers.boundedElastic().schedule(() -> {
+ completableFuturesInflight.decrementAndGet();
+ future.complete(i);
+ }, 5, TimeUnit.MILLISECONDS);
+ return future;
+
}).transform(inflightLimiter::transform).subscribeOn(Schedulers.parallel()),
maxElements);
+
+ List<Integer> values = flux.collectList().block();
+
assertThat(values).containsExactlyInAnyOrderElementsOf(inputValues);
+
assertThat(completableFuturesInflightMax.get()).isEqualTo(maxInflight);
}
- private void verifyFairness(List<Integer> values) {
- int previousValue = 1;
- for (int i = 0; i < values.size(); i++) {
- int value = values.get(i) % 100;
- if (value == 0) {
- value = 100;
- }
- assertThat(Math.abs(previousValue - value)).as("value
%d at index %d", values.get(i), i)
- .isLessThanOrEqualTo(48);
- previousValue = value;
+ @Disabled("This test was used to debug the flakiness issue")
+ @RepeatedTest(100)
+ void repeatShouldNotSubscribeMoreThanMaxInflightForMonos() {
+ shouldNotSubscribeMoreThanMaxInflightForMonos(7, 100);
+ shouldNotRequestOrSubscribeMoreThanMaxInflightForMonos(7, 100);
+ shouldNotRequestOrSubscribeMoreThanMaxInflightForFluxes(7, 100);
+ }
+
+ @ParameterizedTest
+ @CsvSource({ "7,100", "13,100", "37,500", "51,1000" })
+ void shouldNotRequestOrSubscribeMoreThanMaxInflightForFluxes(int
maxInflight, int maxElements) {
+ int subfluxSize = 3;
+
+ InflightLimiter inflightLimiter = new
InflightLimiter(maxInflight, maxInflight, Schedulers.single(),
+
InflightLimiter.DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
+
+ AtomicLong totalRequests = new AtomicLong();
+ AtomicLong requestsMax = new AtomicLong();
+ AtomicInteger subscriptionsActiveBeforeCompletingFirstElement =
new AtomicInteger();
+ AtomicInteger subscriptionsMax = new AtomicInteger();
+
+ List<Integer> inputValues = IntStream.rangeClosed(1,
maxElements).boxed().collect(Collectors.toList());
+
+ Flux<Integer> flux =
Flux.fromIterable(inputValues).window(subfluxSize).flatMap((subFlux) -> {
+ AtomicLong currentRequests = new AtomicLong();
+ return subFlux.flatMap((i) ->
Mono.delay(Duration.ofMillis(25L)).thenReturn(i)).index()
+ .doOnSubscribe((subscription) -> {
+ int currentSubscriptionsCount =
subscriptionsActiveBeforeCompletingFirstElement
+
.incrementAndGet();
+
subscriptionsMax.accumulateAndGet(currentSubscriptionsCount, Math::max);
+ }).doOnRequest((requested) -> {
+
currentRequests.addAndGet(requested);
+ long current =
totalRequests.addAndGet(requested);
+
requestsMax.accumulateAndGet(current, Math::max);
+ }).doOnNext((indexed) -> {
+
currentRequests.decrementAndGet();
+ totalRequests.decrementAndGet();
+ if (indexed.getT1() == 0L) {
+
subscriptionsActiveBeforeCompletingFirstElement.decrementAndGet();
+ }
+ }).doFinally((__) ->
totalRequests.addAndGet(-currentRequests.getAndSet(0)))
+
.transform(inflightLimiter::transform).subscribeOn(Schedulers.parallel());
+ }, maxElements).map(Tuple2::getT2);
+
+ List<Integer> values = flux.collectList().block();
+
assertThat(values).containsExactlyInAnyOrderElementsOf(inputValues);
+ assertThat(requestsMax.get()).isEqualTo(maxInflight);
+ assertThat(subscriptionsMax.get()).isEqualTo(maxInflight);
+ }
+
+ @Test
+ void shouldSpreadRequestsEvenlyAcrossUpstream() {
+ InflightLimiter inflightLimiter = new InflightLimiter(1, 1,
Schedulers.single(),
+
InflightLimiter.DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
+ List<Integer> values = Flux
+ .merge(100, Flux.range(1,
100).delayElements(Duration.ofMillis(1)).as(inflightLimiter::createOperator),
+ Flux.range(101,
100).delayElements(Duration.ofMillis(1)).as(inflightLimiter::createOperator),
+ Flux.range(201,
100).delayElements(Duration.ofMillis(1)).as(inflightLimiter::createOperator))
+ .collectList().block();
+ assertThat(values).containsExactlyInAnyOrderElementsOf(
+ IntStream.rangeClosed(1,
300).boxed().collect(Collectors.toList()));
+ for (int i = 0; i < 100; i = i + 3) {
+ assertThat(new int[] { values.get(i), values.get(i +
1), values.get(i + 2) })
+ .containsExactly(values.get(i),
values.get(i) + 100, values.get(i) + 200)
+ .as("values at index %d-%d", i, i + 2);
}
}