This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ea7d2f  Fix issue with max inflight limit in sendMany (#89)
0ea7d2f is described below

commit 0ea7d2f03bb377e930b7bc7953a00fe01cb15117
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Dec 7 17:20:58 2022 +0200

    Fix issue with max inflight limit in sendMany (#89)
    
    * Add maxInflight test to AdaptedReactiveMessageSenderTest
    
    * Fix issue with sendMany and max inflight limit
    
    - refactor solution to apply the limiter to the mono that sends the message
---
 .../adapter/AdaptedReactiveMessageSender.java      | 16 ++---
 .../client/internal/adapter/ProducerCache.java     | 14 ++--
 .../internal/adapter/ProducerCacheEntry.java       |  4 ++
 .../internal/adapter/ReactiveProducerAdapter.java  | 26 ++++---
 .../adapter/AdaptedReactiveMessageSenderTest.java  | 81 ++++++++++++++++++++++
 5 files changed, 115 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
index 7ed0062..0967b64 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
@@ -183,24 +183,24 @@ class AdaptedReactiveMessageSender<T> implements 
ReactiveMessageSender<T> {
 
        @Override
        public Mono<MessageId> sendOne(MessageSpec<T> messageSpec) {
-               return createReactiveProducerAdapter().usingProducer((producer) 
-> createMessageMono(messageSpec, producer));
+               return createReactiveProducerAdapter()
+                               .usingProducer((producer, transformer) -> 
createMessageMono(messageSpec, producer, transformer));
        }
 
-       private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec, 
Producer<T> producer) {
+       private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec, 
Producer<T> producer,
+                       PublisherTransformer transformer) {
                return PulsarFutureAdapter.adaptPulsarFuture(() -> {
                        TypedMessageBuilder<T> typedMessageBuilder = 
producer.newMessage();
                        ((InternalMessageSpec<T>) 
messageSpec).configure(typedMessageBuilder);
                        return typedMessageBuilder.sendAsync();
-               });
+               }).transform(transformer::transform);
        }
 
        @Override
        public Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs) 
{
-               return 
createReactiveProducerAdapter().usingProducerMany((producer) ->
-               // TODO: ensure that inner publishers are subscribed in order 
so that message
-               // order is retained
-               Flux.from(messageSpecs).flatMapSequential((messageSpec) -> 
createMessageMono(messageSpec, producer),
-                               this.maxConcurrency));
+               return createReactiveProducerAdapter()
+                               .usingProducerMany((producer, transformer) -> 
Flux.from(messageSpecs).flatMapSequential(
+                                               (messageSpec) -> 
createMessageMono(messageSpec, producer, transformer), this.maxConcurrency));
        }
 
 }
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
index c538437..81449b1 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
@@ -17,7 +17,7 @@
 package org.apache.pulsar.reactive.client.internal.adapter;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
+import java.util.function.BiFunction;
 import java.util.function.Supplier;
 
 import org.apache.pulsar.client.api.Producer;
@@ -50,10 +50,10 @@ class ProducerCache implements ReactiveMessageSenderCache {
 
        <T, R> Mono<R> usingCachedProducer(ProducerCacheKey cacheKey, 
Mono<Producer<T>> producerMono,
                        Supplier<PublisherTransformer> 
producerActionTransformer,
-                       Function<Producer<T>, Mono<R>> usingProducerAction) {
+                       BiFunction<Producer<T>, PublisherTransformer, Mono<R>> 
usingProducerAction) {
                return Mono.usingWhen(this.leaseCacheEntry(cacheKey, 
producerMono, producerActionTransformer),
-                               (producerCacheEntry) -> 
usingProducerAction.apply(producerCacheEntry.getProducer())
-                                               
.as(producerCacheEntry::decorateProducerAction),
+                               (producerCacheEntry) -> 
usingProducerAction.apply(producerCacheEntry.getProducer(),
+                                               
producerCacheEntry.getProducerActionTransformer()),
                                this::returnCacheEntry);
        }
 
@@ -69,10 +69,10 @@ class ProducerCache implements ReactiveMessageSenderCache {
 
        <T, R> Flux<R> usingCachedProducerMany(ProducerCacheKey cacheKey, 
Mono<Producer<T>> producerMono,
                        Supplier<PublisherTransformer> 
producerActionTransformer,
-                       Function<Producer<T>, Flux<R>> usingProducerAction) {
+                       BiFunction<Producer<T>, PublisherTransformer, Flux<R>> 
usingProducerAction) {
                return Flux.usingWhen(this.leaseCacheEntry(cacheKey, 
producerMono, producerActionTransformer),
-                               (producerCacheEntry) -> 
usingProducerAction.apply(producerCacheEntry.getProducer())
-                                               
.as(producerCacheEntry::decorateProducerAction),
+                               (producerCacheEntry) -> 
usingProducerAction.apply(producerCacheEntry.getProducer(),
+                                               
producerCacheEntry.getProducerActionTransformer()),
                                this::returnCacheEntry);
        }
 
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
index 1f185c1..6f802d5 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
@@ -133,4 +133,8 @@ class ProducerCacheEntry implements AutoCloseable {
                return 
Mono.from(this.producerActionTransformer.transform(source));
        }
 
+       PublisherTransformer getProducerActionTransformer() {
+               return this.producerActionTransformer;
+       }
+
 }
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
index a99ff9c..b65f040 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
@@ -16,6 +16,7 @@
 
 package org.apache.pulsar.reactive.client.internal.adapter;
 
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -72,7 +73,7 @@ class ReactiveProducerAdapter<T> {
                return 
AdapterImplementationFactory.adaptPulsarFuture(producer::closeAsync);
        }
 
-       <R> Mono<R> usingProducer(Function<Producer<T>, Mono<R>> 
usingProducerAction) {
+       <R> Mono<R> usingProducer(BiFunction<Producer<T>, PublisherTransformer, 
Mono<R>> usingProducerAction) {
                if (this.producerCache != null) {
                        return usingCachedProducer(usingProducerAction);
                }
@@ -81,16 +82,16 @@ class ReactiveProducerAdapter<T> {
                }
        }
 
-       private <R> Mono<R> usingUncachedProducer(Function<Producer<T>, 
Mono<R>> usingProducerAction) {
+       private <R> Mono<R> usingUncachedProducer(
+                       BiFunction<Producer<T>, PublisherTransformer, Mono<R>> 
usingProducerAction) {
                return Mono.usingWhen(createProducerMono(),
                                (producer) -> 
Mono.using(this.producerActionTransformer::get,
-                                               (transformer) -> 
usingProducerAction.apply(producer)
-                                                               .as((mono) -> 
Mono.from(transformer.transform(mono))),
-                                               Disposable::dispose),
+                                               (transformer) -> 
usingProducerAction.apply(producer, transformer), Disposable::dispose),
                                this::closeProducer);
        }
 
-       private <R> Mono<R> usingCachedProducer(Function<Producer<T>, Mono<R>> 
usingProducerAction) {
+       private <R> Mono<R> usingCachedProducer(
+                       BiFunction<Producer<T>, PublisherTransformer, Mono<R>> 
usingProducerAction) {
                return 
createCachedProducerKeyAndMono().flatMap((keyAndProducerMono) -> {
                        ProducerCacheKey cacheKey = keyAndProducerMono.getT1();
                        Mono<Producer<T>> producerMono = 
keyAndProducerMono.getT2();
@@ -99,7 +100,7 @@ class ReactiveProducerAdapter<T> {
                });
        }
 
-       <R> Flux<R> usingProducerMany(Function<Producer<T>, Flux<R>> 
usingProducerAction) {
+       <R> Flux<R> usingProducerMany(BiFunction<Producer<T>, 
PublisherTransformer, Flux<R>> usingProducerAction) {
                if (this.producerCache != null) {
                        return usingCachedProducerMany(usingProducerAction);
                }
@@ -108,13 +109,16 @@ class ReactiveProducerAdapter<T> {
                }
        }
 
-       private <R> Flux<R> usingUncachedProducerMany(Function<Producer<T>, 
Flux<R>> usingProducerAction) {
-               return Flux.usingWhen(createProducerMono(), (producer) -> 
Flux.using(this.producerActionTransformer::get,
-                               (transformer) -> 
usingProducerAction.apply(producer).as(transformer::transform), 
Disposable::dispose),
+       private <R> Flux<R> usingUncachedProducerMany(
+                       BiFunction<Producer<T>, PublisherTransformer, Flux<R>> 
usingProducerAction) {
+               return Flux.usingWhen(createProducerMono(),
+                               (producer) -> 
Flux.using(this.producerActionTransformer::get,
+                                               (transformer) -> 
usingProducerAction.apply(producer, transformer), Disposable::dispose),
                                this::closeProducer);
        }
 
-       private <R> Flux<R> usingCachedProducerMany(Function<Producer<T>, 
Flux<R>> usingProducerAction) {
+       private <R> Flux<R> usingCachedProducerMany(
+                       BiFunction<Producer<T>, PublisherTransformer, Flux<R>> 
usingProducerAction) {
                return 
createCachedProducerKeyAndMono().flatMapMany((keyAndProducerMono) -> {
                        ProducerCacheKey cacheKey = keyAndProducerMono.getT1();
                        Mono<Producer<T>> producerMono = 
keyAndProducerMono.getT2();
diff --git 
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
 
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
index 8d24258..43f73dd 100644
--- 
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
+++ 
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
@@ -19,10 +19,17 @@ package org.apache.pulsar.reactive.client.internal.adapter;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import org.apache.pulsar.client.api.BatcherBuilder;
@@ -36,10 +43,12 @@ import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ProducerBase;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.internal.DefaultImplementation;
 import 
org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
 import org.apache.pulsar.reactive.client.api.MessageSpec;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
@@ -59,6 +68,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -318,4 +328,75 @@ class AdaptedReactiveMessageSenderTest {
                assertThat(reconnectTimeout).isBetween(Duration.ofSeconds(4), 
Duration.ofSeconds(5));
        }
 
+       @Test
+       void maxInFlightUsingSendOne() throws Exception {
+               doTestMaxInFlight((reactiveSender, inputFlux) -> inputFlux
+                               .flatMap((i) -> 
reactiveSender.sendOne(MessageSpec.of(String.valueOf(i))), 100));
+       }
+
+       @Test
+       void maxInFlightUsingSendMany() throws Exception {
+               doTestMaxInFlight((reactiveSender, inputFlux) -> 
inputFlux.window(3).flatMap(
+                               (subFlux) -> subFlux.map((i) -> 
MessageSpec.of(String.valueOf(i))).as(reactiveSender::sendMany), 100));
+       }
+
+       void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>, 
Flux<Integer>, Flux<MessageId>> sendingFunction)
+                       throws Exception {
+               ScheduledExecutorService executorService = null;
+               try {
+                       executorService = 
Executors.newSingleThreadScheduledExecutor();
+                       final ScheduledExecutorService finalExecutorService = 
executorService;
+                       PulsarClientImpl pulsarClient = spy(
+                                       (PulsarClientImpl) 
PulsarClient.builder().serviceUrl("http://dummy";).build());
+                       AtomicLong totalRequests = new AtomicLong();
+                       AtomicLong requestsMax = new AtomicLong();
+                       ProducerBase<String> producer = 
mock(ProducerBase.class);
+                       
given(producer.closeAsync()).willReturn(CompletableFuture.completedFuture(null));
+                       given(producer.isConnected()).willReturn(true);
+                       given(producer.newMessage()).willAnswer((__) -> {
+                               TypedMessageBuilderImpl<String> 
typedMessageBuilder = spy(
+                                               new 
TypedMessageBuilderImpl<>(producer, Schema.STRING));
+                               
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
+                                       CompletableFuture<MessageId> 
messageSender = new CompletableFuture<>();
+                                       finalExecutorService.execute(() -> {
+                                               long current = 
totalRequests.incrementAndGet();
+                                               
requestsMax.accumulateAndGet(current, Math::max);
+                                       });
+                                       finalExecutorService.schedule(() -> {
+                                               totalRequests.decrementAndGet();
+                                               // encode integer in message 
value to entry id in message id
+                                               int encodedEntryId = 
Integer.parseInt(typedMessageBuilder.getMessage().getValue());
+                                               messageSender.complete(
+                                                               
DefaultImplementation.getDefaultImplementation().newMessageId(1, 
encodedEntryId, 1));
+                                       }, 5, TimeUnit.MILLISECONDS);
+                                       return messageSender;
+                               });
+                               return typedMessageBuilder;
+                       });
+
+                       given(pulsarClient.createProducerAsync(any(), 
eq(Schema.STRING), isNull()))
+                                       
.willReturn(CompletableFuture.completedFuture(producer));
+
+                       ReactiveMessageSender<String> reactiveSender = 
AdaptedReactivePulsarClientFactory.create(pulsarClient)
+                                       
.messageSender(Schema.STRING).maxInflight(7).cache(AdaptedReactivePulsarClientFactory.createCache())
+                                       
.maxConcurrentSenderSubscriptions(1024).topic("my-topic").build();
+
+                       List<Integer> inputValues = IntStream.rangeClosed(1, 
1000).boxed().collect(Collectors.toList());
+
+                       Flux<Integer> inputFlux = 
Flux.fromIterable(inputValues);
+                       Flux<MessageId> outputFlux = 
sendingFunction.apply(reactiveSender, inputFlux);
+
+                       // get message value from encoded entry id in message id
+                       List<Integer> outputValues = outputFlux.map((m) -> 
(int) ((MessageIdImpl) m).getEntryId()).collectList()
+                                       .block();
+                       
assertThat(outputValues).containsExactlyInAnyOrderElementsOf(inputValues);
+                       assertThat(requestsMax.get()).isEqualTo(7);
+               }
+               finally {
+                       if (executorService != null) {
+                               executorService.shutdownNow();
+                       }
+               }
+       }
+
 }

Reply via email to