This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 6af9aca  Fix bugs in InflightLimiter and add tests (#83)
6af9aca is described below

commit 6af9aca9369d21b5d750fd4a319e06e5285f0f30
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Dec 7 12:43:29 2022 +0200

    Fix bugs in InflightLimiter and add tests (#83)
    
    * Add tests for InflightLimiter
    
    * Optimize maybeTriggerNext
    
    * Prevent requestedDemand overflow when Long.MAX_VALUE is requested
    
    * Fix maxRequest calculation
    
    * Remove obsolete statements
    
    * Reduce flakiness (3 seconds might not be sufficient)
---
 .../client/internal/api/InflightLimiter.java       |  84 ++++++----
 .../client/api/ReactiveMessagePipelineTest.java    |   2 +-
 .../client/internal/api/InflightLimiterTest.java   | 180 +++++++++++++++++----
 3 files changed, 204 insertions(+), 62 deletions(-)

diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiter.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiter.java
index 82a6fc2..feac6e6 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiter.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiter.java
@@ -16,6 +16,7 @@
 
 package org.apache.pulsar.reactive.client.internal.api;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -36,6 +37,14 @@ import reactor.util.context.Context;
 /**
  * Transformer class that limits the number of reactive streams subscription 
requests to
  * keep the number of pending messages under a defined limit.
+ *
+ * Subscribing to upstream is postponed if the max inflight limit is reached 
since
+ * subscribing to a CompletableFuture is eager. The CompetableFuture will be 
created at
+ * subscription time and this demand cannot be controlled with Reactive 
Stream's requests.
+ * The solution for this is to acquire one slot at subscription time and 
return this slot
+ * when request is made to the subscription. Since it's not possible to 
backpressure the
+ * subscription requests, there's a configurable limit for the total number of 
pending
+ * subscriptions. Exceeding the limit will cause IllegalStateException at 
runtime.
  */
 public class InflightLimiter implements PublisherTransformer {
 
@@ -54,24 +63,32 @@ public class InflightLimiter implements 
PublisherTransformer {
 
        private final Scheduler.Worker triggerNextWorker;
 
+       private final AtomicBoolean triggerNextTriggered = new 
AtomicBoolean(false);
+
        /**
         * Constructs an InflightLimiter with a maximum number of in-flight 
messages.
         * @param maxInflight the maximum number of in-flight messages
         */
        public InflightLimiter(int maxInflight) {
-               this(maxInflight, maxInflight, Schedulers.single(), 
DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
+               this(maxInflight, 0, Schedulers.single(), 
DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
        }
 
        /**
         * Constructs an InflightLimiter.
         * @param maxInflight the maximum number of in-flight messages
-        * @param expectedSubscriptionsInflight the expected number of 
in-flight subscriptions
+        * @param expectedSubscriptionsInflight the expected number of in-flight
+        * subscriptions. Will limit the per-subscription requests to 
maxInflight / max(active
+        * subscriptions, expectedSubscriptionsInflight). Set to 0 to use 
active subscriptions
+        * in the calculation.
         * @param triggerNextScheduler the scheduler on which it will be 
checked if the
         * subscriber can request more
         * @param maxPendingSubscriptions the maximum number of pending 
subscriptions
         */
        public InflightLimiter(int maxInflight, int 
expectedSubscriptionsInflight, Scheduler triggerNextScheduler,
                        int maxPendingSubscriptions) {
+               if (maxInflight < 1) {
+                       throw new IllegalArgumentException("maxInflight must be 
greater than 0");
+               }
                this.maxInflight = maxInflight;
                this.expectedSubscriptionsInflight = 
expectedSubscriptionsInflight;
                this.triggerNextWorker = triggerNextScheduler.createWorker();
@@ -116,21 +133,25 @@ public class InflightLimiter implements 
PublisherTransformer {
        }
 
        void maybeTriggerNext() {
-               if (!this.triggerNextWorker.isDisposed()) {
-                       this.triggerNextWorker.schedule(() -> {
-                               int remainingSubscriptions = 
this.pendingSubscriptions.size();
-                               while (this.inflight.get() < this.maxInflight 
&& remainingSubscriptions-- > 0) {
-                                       InflightLimiterSubscriber<?> subscriber 
= this.pendingSubscriptions.poll();
-                                       if (subscriber != null) {
-                                               if (!subscriber.isDisposed()) {
-                                                       
subscriber.requestMore();
+               if (!this.triggerNextWorker.isDisposed() && this.inflight.get() 
< this.maxInflight
+                               && !this.pendingSubscriptions.isEmpty()) {
+                       if (this.triggerNextTriggered.compareAndSet(false, 
true)) {
+                               this.triggerNextWorker.schedule(() -> {
+                                       this.triggerNextTriggered.set(false);
+                                       int remainingSubscriptions = 
this.pendingSubscriptions.size();
+                                       while (this.inflight.get() < 
this.maxInflight && remainingSubscriptions-- > 0) {
+                                               InflightLimiterSubscriber<?> 
subscriber = this.pendingSubscriptions.poll();
+                                               if (subscriber != null) {
+                                                       if 
(!subscriber.isDisposed()) {
+                                                               
subscriber.requestMore();
+                                                       }
+                                               }
+                                               else {
+                                                       break;
                                                }
                                        }
-                                       else {
-                                               break;
-                                       }
-                               }
-                       });
+                               });
+                       }
                }
        }
 
@@ -177,7 +198,12 @@ public class InflightLimiter implements 
PublisherTransformer {
                private final Subscription subscription = new Subscription() {
                        @Override
                        public void request(long n) {
-                               
InflightLimiterSubscriber.this.requestedDemand.addAndGet(n);
+                               if (n == Long.MAX_VALUE) {
+                                       
InflightLimiterSubscriber.this.requestedDemand.set(n);
+                               }
+                               else if 
(InflightLimiterSubscriber.this.requestedDemand.get() != Long.MAX_VALUE) {
+                                       
InflightLimiterSubscriber.this.requestedDemand.addAndGet(n);
+                               }
                                maybeAddToPending();
                                maybeTriggerNext();
                        }
@@ -248,16 +274,20 @@ public class InflightLimiter implements 
PublisherTransformer {
                }
 
                void requestMore() {
-                       if (this.state.get() == 
InflightLimiterSubscriberState.SUBSCRIBED || (this.requestedDemand.get() > 0
-                                       && this.inflightForSubscription.get() 
<= InflightLimiter.this.expectedSubscriptionsInflight / 2
-                                       && InflightLimiter.this.inflight.get() 
< InflightLimiter.this.maxInflight)) {
+                       // spread requests evenly across active subscriptions 
(or expected number of
+                       // subscriptions)
+                       int maxInflightForSubscription = Math
+                                       .max(InflightLimiter.this.maxInflight / 
Math.max(InflightLimiter.this.activeSubscriptions.get(),
+                                                       
InflightLimiter.this.expectedSubscriptionsInflight), 1);
+                       if (this.requestedDemand.get() > 0 && (this.state.get() 
== InflightLimiterSubscriberState.SUBSCRIBED
+                                       || (this.inflightForSubscription.get() 
< maxInflightForSubscription
+                                                       && 
InflightLimiter.this.inflight.get() < InflightLimiter.this.maxInflight))) {
                                if 
(this.state.compareAndSet(InflightLimiterSubscriberState.INITIAL,
                                                
InflightLimiterSubscriberState.SUBSCRIBING)) {
                                        // consume one slot for the 
subscription, since the first element
                                        // might already be in flight
                                        // when a CompletableFuture is mapped 
to a Mono
                                        
InflightLimiter.this.inflight.incrementAndGet();
-                                       this.requestedDemand.decrementAndGet();
                                        
this.inflightForSubscription.incrementAndGet();
                                        
this.source.subscribe(InflightLimiterSubscriber.this);
                                }
@@ -270,20 +300,10 @@ public class InflightLimiter implements 
PublisherTransformer {
                                                // reverse the slot reservation 
made when transitioning from
                                                // INITIAL to SUBSCRIBING
                                                
InflightLimiter.this.inflight.decrementAndGet();
-                                               
this.requestedDemand.incrementAndGet();
                                                
this.inflightForSubscription.decrementAndGet();
                                        }
-                                       long maxRequest = Math
-                                                       .max(Math.min(
-                                                                       
Math.min(
-                                                                               
        Math.min(this.requestedDemand.get(),
-                                                                               
                        InflightLimiter.this.maxInflight
-                                                                               
                                        - InflightLimiter.this.inflight.get()),
-                                                                               
        InflightLimiter.this.expectedSubscriptionsInflight
-                                                                               
                        - this.inflightForSubscription.get()),
-                                                                       
InflightLimiter.this.maxInflight
-                                                                               
        / Math.max(InflightLimiter.this.activeSubscriptions.get(), 1)),
-                                                                       1);
+                                       long maxRequest = 
Math.max(Math.min(this.requestedDemand.get(),
+                                                       
maxInflightForSubscription - this.inflightForSubscription.get()), 1);
                                        
InflightLimiter.this.inflight.addAndGet((int) maxRequest);
                                        
this.requestedDemand.addAndGet(-maxRequest);
                                        
this.inflightForSubscription.addAndGet((int) maxRequest);
diff --git 
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
 
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
index 36f68a2..c6ff416 100644
--- 
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
+++ 
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
@@ -318,7 +318,7 @@ class ReactiveMessagePipelineTest {
                try (ReactiveMessagePipeline pipeline = 
testConsumer.messagePipeline().messageHandler(messageHandler2)
                                
.concurrency(1000).maxInflight(maxInFlight).build()) {
                        pipeline.start();
-                       assertThat(latch.await(3, TimeUnit.SECONDS)).isTrue();
+                       assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
                        
assertThat(inflightCounter.getMax()).isEqualTo(maxInFlight);
                }
        }
diff --git 
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiterTest.java
 
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiterTest.java
index 6de0ce7..6e5478d 100644
--- 
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiterTest.java
+++ 
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiterTest.java
@@ -16,50 +16,172 @@
 
 package org.apache.pulsar.reactive.client.internal.api;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
+import java.time.Duration;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler.Worker;
 import reactor.core.scheduler.Schedulers;
-import reactor.test.StepVerifier;
+import reactor.util.function.Tuple2;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 class InflightLimiterTest {
 
-       @Test
-       void shouldLimitInflight() {
-               List<Integer> values = Collections.synchronizedList(new 
ArrayList<>());
-               InflightLimiter inflightLimiter = new InflightLimiter(48, 24, 
Schedulers.single(),
+       @ParameterizedTest
+       @CsvSource({ "7,100", "13,100", "37,500", "51,1000" })
+       void shouldNotRequestOrSubscribeMoreThanMaxInflightForMonos(int 
maxInflight, int maxElements) {
+               InflightLimiter inflightLimiter = new 
InflightLimiter(maxInflight, maxInflight, Schedulers.single(),
                                
InflightLimiter.DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
-               Flux.merge(Arrays.asList(
-                               Flux.range(1, 
100).publishOn(Schedulers.parallel()).log().as(inflightLimiter::createOperator),
-                               Flux.range(101, 
100).publishOn(Schedulers.parallel()).log().as(inflightLimiter::createOperator),
-                               Flux.range(201, 
100).publishOn(Schedulers.parallel()).log().as(inflightLimiter::createOperator)))
-                               
.as(StepVerifier::create).expectSubscription().recordWith(() -> 
values).expectNextCount(300)
-                               .expectComplete().verify();
-               assertThat(values)
-                               
.containsExactlyInAnyOrderElementsOf(IntStream.range(1, 
301).boxed().collect(Collectors.toList()));
-               // verify "fairness"
-               // TODO: this is flaky, fix it
-               // verifyFairness(values);
+
+               AtomicLong totalRequests = new AtomicLong();
+               AtomicLong requestsMax = new AtomicLong();
+               AtomicInteger subscriptionsActiveBeforeCompletingFirstElement = 
new AtomicInteger();
+               AtomicInteger subscriptionsMax = new AtomicInteger();
+
+               List<Integer> inputValues = IntStream.rangeClosed(1, 
maxElements).boxed().collect(Collectors.toList());
+
+               Worker worker = Schedulers.boundedElastic().createWorker();
+               try {
+                       Flux<Integer> flux = 
Flux.fromIterable(inputValues).flatMap((i) -> {
+                               AtomicLong currentRequests = new AtomicLong();
+                               return 
Mono.delay(Duration.ofMillis(25L)).thenReturn(i).doOnSubscribe((subscription) 
-> {
+                                       worker.schedule(() -> {
+                                               int currentSubscriptionsCount = 
subscriptionsActiveBeforeCompletingFirstElement
+                                                               
.incrementAndGet();
+                                               
subscriptionsMax.accumulateAndGet(currentSubscriptionsCount, Math::max);
+                                       });
+                               }).doOnRequest((requested) -> {
+                                       worker.schedule(() -> {
+                                               
currentRequests.addAndGet(requested);
+                                               long current = 
totalRequests.addAndGet(requested);
+                                               
requestsMax.accumulateAndGet(current, Math::max);
+                                       });
+                               }).doOnNext((__) -> {
+                                       worker.schedule(() -> {
+                                               
subscriptionsActiveBeforeCompletingFirstElement.decrementAndGet();
+                                               // Mono will complete after 
this element, so reduce all remaining
+                                               // requests;
+                                               
totalRequests.addAndGet(-currentRequests.getAndSet(0));
+                                       });
+                               
}).transform(inflightLimiter::transform).subscribeOn(Schedulers.single());
+                       }, maxElements);
+                       List<Integer> values = flux.collectList().block();
+                       SoftAssertions.assertSoftly((softly) -> {
+                               
softly.assertThat(values).containsExactlyInAnyOrderElementsOf(inputValues);
+                               
softly.assertThat(requestsMax.get()).as("requestsMax").isEqualTo(maxInflight);
+                               
softly.assertThat(subscriptionsMax.get()).as("subscriptionsMax").isEqualTo(maxInflight);
+                       });
+               }
+               finally {
+                       worker.dispose();
+               }
+       }
+
+       @ParameterizedTest
+       @CsvSource({ "7,100", "13,100", "37,500", "51,1000" })
+       void shouldNotSubscribeMoreThanMaxInflightForMonos(int maxInflight, int 
maxElements) {
+               InflightLimiter inflightLimiter = new 
InflightLimiter(maxInflight, maxInflight, Schedulers.single(),
+                               
InflightLimiter.DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
+
+               AtomicInteger completableFuturesInflight = new AtomicInteger();
+               AtomicInteger completableFuturesInflightMax = new 
AtomicInteger();
+
+               List<Integer> inputValues = IntStream.rangeClosed(1, 
maxElements).boxed().collect(Collectors.toList());
+
+               Flux<Integer> flux = Flux.fromIterable(inputValues).flatMap((i) 
-> Mono.fromCompletionStage(() -> {
+                       
completableFuturesInflightMax.accumulateAndGet(completableFuturesInflight.incrementAndGet(),
 Math::max);
+                       CompletableFuture<Integer> future = new 
CompletableFuture<>();
+                       Schedulers.boundedElastic().schedule(() -> {
+                               completableFuturesInflight.decrementAndGet();
+                               future.complete(i);
+                       }, 5, TimeUnit.MILLISECONDS);
+                       return future;
+               
}).transform(inflightLimiter::transform).subscribeOn(Schedulers.parallel()), 
maxElements);
+
+               List<Integer> values = flux.collectList().block();
+               
assertThat(values).containsExactlyInAnyOrderElementsOf(inputValues);
+               
assertThat(completableFuturesInflightMax.get()).isEqualTo(maxInflight);
        }
 
-       private void verifyFairness(List<Integer> values) {
-               int previousValue = 1;
-               for (int i = 0; i < values.size(); i++) {
-                       int value = values.get(i) % 100;
-                       if (value == 0) {
-                               value = 100;
-                       }
-                       assertThat(Math.abs(previousValue - value)).as("value 
%d at index %d", values.get(i), i)
-                                       .isLessThanOrEqualTo(48);
-                       previousValue = value;
+       @Disabled("This test was used to debug the flakiness issue")
+       @RepeatedTest(100)
+       void repeatShouldNotSubscribeMoreThanMaxInflightForMonos() {
+               shouldNotSubscribeMoreThanMaxInflightForMonos(7, 100);
+               shouldNotRequestOrSubscribeMoreThanMaxInflightForMonos(7, 100);
+               shouldNotRequestOrSubscribeMoreThanMaxInflightForFluxes(7, 100);
+       }
+
+       @ParameterizedTest
+       @CsvSource({ "7,100", "13,100", "37,500", "51,1000" })
+       void shouldNotRequestOrSubscribeMoreThanMaxInflightForFluxes(int 
maxInflight, int maxElements) {
+               int subfluxSize = 3;
+
+               InflightLimiter inflightLimiter = new 
InflightLimiter(maxInflight, maxInflight, Schedulers.single(),
+                               
InflightLimiter.DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
+
+               AtomicLong totalRequests = new AtomicLong();
+               AtomicLong requestsMax = new AtomicLong();
+               AtomicInteger subscriptionsActiveBeforeCompletingFirstElement = 
new AtomicInteger();
+               AtomicInteger subscriptionsMax = new AtomicInteger();
+
+               List<Integer> inputValues = IntStream.rangeClosed(1, 
maxElements).boxed().collect(Collectors.toList());
+
+               Flux<Integer> flux = 
Flux.fromIterable(inputValues).window(subfluxSize).flatMap((subFlux) -> {
+                       AtomicLong currentRequests = new AtomicLong();
+                       return subFlux.flatMap((i) -> 
Mono.delay(Duration.ofMillis(25L)).thenReturn(i)).index()
+                                       .doOnSubscribe((subscription) -> {
+                                               int currentSubscriptionsCount = 
subscriptionsActiveBeforeCompletingFirstElement
+                                                               
.incrementAndGet();
+                                               
subscriptionsMax.accumulateAndGet(currentSubscriptionsCount, Math::max);
+                                       }).doOnRequest((requested) -> {
+                                               
currentRequests.addAndGet(requested);
+                                               long current = 
totalRequests.addAndGet(requested);
+                                               
requestsMax.accumulateAndGet(current, Math::max);
+                                       }).doOnNext((indexed) -> {
+                                               
currentRequests.decrementAndGet();
+                                               totalRequests.decrementAndGet();
+                                               if (indexed.getT1() == 0L) {
+                                                       
subscriptionsActiveBeforeCompletingFirstElement.decrementAndGet();
+                                               }
+                                       }).doFinally((__) -> 
totalRequests.addAndGet(-currentRequests.getAndSet(0)))
+                                       
.transform(inflightLimiter::transform).subscribeOn(Schedulers.parallel());
+               }, maxElements).map(Tuple2::getT2);
+
+               List<Integer> values = flux.collectList().block();
+               
assertThat(values).containsExactlyInAnyOrderElementsOf(inputValues);
+               assertThat(requestsMax.get()).isEqualTo(maxInflight);
+               assertThat(subscriptionsMax.get()).isEqualTo(maxInflight);
+       }
+
+       @Test
+       void shouldSpreadRequestsEvenlyAcrossUpstream() {
+               InflightLimiter inflightLimiter = new InflightLimiter(1, 1, 
Schedulers.single(),
+                               
InflightLimiter.DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
+               List<Integer> values = Flux
+                               .merge(100, Flux.range(1, 
100).delayElements(Duration.ofMillis(1)).as(inflightLimiter::createOperator),
+                                               Flux.range(101, 
100).delayElements(Duration.ofMillis(1)).as(inflightLimiter::createOperator),
+                                               Flux.range(201, 
100).delayElements(Duration.ofMillis(1)).as(inflightLimiter::createOperator))
+                               .collectList().block();
+               assertThat(values).containsExactlyInAnyOrderElementsOf(
+                               IntStream.rangeClosed(1, 
300).boxed().collect(Collectors.toList()));
+               for (int i = 0; i < 100; i = i + 3) {
+                       assertThat(new int[] { values.get(i), values.get(i + 
1), values.get(i + 2) })
+                                       .containsExactly(values.get(i), 
values.get(i) + 100, values.get(i) + 200)
+                                       .as("values at index %d-%d", i, i + 2);
                }
        }
 

Reply via email to