This is an automated email from the ASF dual-hosted git repository.
cbornet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 0ea7d2f Fix issue with max inflight limit in sendMany (#89)
0ea7d2f is described below
commit 0ea7d2f03bb377e930b7bc7953a00fe01cb15117
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Dec 7 17:20:58 2022 +0200
Fix issue with max inflight limit in sendMany (#89)
* Add maxInflight test to AdaptedReactiveMessageSenderTest
* Fix issue with sendMany and max inflight limit
- refactor solution to apply the limiter to the mono that sends the message
---
.../adapter/AdaptedReactiveMessageSender.java | 16 ++---
.../client/internal/adapter/ProducerCache.java | 14 ++--
.../internal/adapter/ProducerCacheEntry.java | 4 ++
.../internal/adapter/ReactiveProducerAdapter.java | 26 ++++---
.../adapter/AdaptedReactiveMessageSenderTest.java | 81 ++++++++++++++++++++++
5 files changed, 115 insertions(+), 26 deletions(-)
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
index 7ed0062..0967b64 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
@@ -183,24 +183,24 @@ class AdaptedReactiveMessageSender<T> implements
ReactiveMessageSender<T> {
@Override
public Mono<MessageId> sendOne(MessageSpec<T> messageSpec) {
- return createReactiveProducerAdapter().usingProducer((producer)
-> createMessageMono(messageSpec, producer));
+ return createReactiveProducerAdapter()
+ .usingProducer((producer, transformer) ->
createMessageMono(messageSpec, producer, transformer));
}
- private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec,
Producer<T> producer) {
+ private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec,
Producer<T> producer,
+ PublisherTransformer transformer) {
return PulsarFutureAdapter.adaptPulsarFuture(() -> {
TypedMessageBuilder<T> typedMessageBuilder =
producer.newMessage();
((InternalMessageSpec<T>)
messageSpec).configure(typedMessageBuilder);
return typedMessageBuilder.sendAsync();
- });
+ }).transform(transformer::transform);
}
@Override
public Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs)
{
- return
createReactiveProducerAdapter().usingProducerMany((producer) ->
- // TODO: ensure that inner publishers are subscribed in order
so that message
- // order is retained
- Flux.from(messageSpecs).flatMapSequential((messageSpec) ->
createMessageMono(messageSpec, producer),
- this.maxConcurrency));
+ return createReactiveProducerAdapter()
+ .usingProducerMany((producer, transformer) ->
Flux.from(messageSpecs).flatMapSequential(
+ (messageSpec) ->
createMessageMono(messageSpec, producer, transformer), this.maxConcurrency));
}
}
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
index c538437..81449b1 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
@@ -17,7 +17,7 @@
package org.apache.pulsar.reactive.client.internal.adapter;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
+import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.Producer;
@@ -50,10 +50,10 @@ class ProducerCache implements ReactiveMessageSenderCache {
<T, R> Mono<R> usingCachedProducer(ProducerCacheKey cacheKey,
Mono<Producer<T>> producerMono,
Supplier<PublisherTransformer>
producerActionTransformer,
- Function<Producer<T>, Mono<R>> usingProducerAction) {
+ BiFunction<Producer<T>, PublisherTransformer, Mono<R>>
usingProducerAction) {
return Mono.usingWhen(this.leaseCacheEntry(cacheKey,
producerMono, producerActionTransformer),
- (producerCacheEntry) ->
usingProducerAction.apply(producerCacheEntry.getProducer())
-
.as(producerCacheEntry::decorateProducerAction),
+ (producerCacheEntry) ->
usingProducerAction.apply(producerCacheEntry.getProducer(),
+
producerCacheEntry.getProducerActionTransformer()),
this::returnCacheEntry);
}
@@ -69,10 +69,10 @@ class ProducerCache implements ReactiveMessageSenderCache {
<T, R> Flux<R> usingCachedProducerMany(ProducerCacheKey cacheKey,
Mono<Producer<T>> producerMono,
Supplier<PublisherTransformer>
producerActionTransformer,
- Function<Producer<T>, Flux<R>> usingProducerAction) {
+ BiFunction<Producer<T>, PublisherTransformer, Flux<R>>
usingProducerAction) {
return Flux.usingWhen(this.leaseCacheEntry(cacheKey,
producerMono, producerActionTransformer),
- (producerCacheEntry) ->
usingProducerAction.apply(producerCacheEntry.getProducer())
-
.as(producerCacheEntry::decorateProducerAction),
+ (producerCacheEntry) ->
usingProducerAction.apply(producerCacheEntry.getProducer(),
+
producerCacheEntry.getProducerActionTransformer()),
this::returnCacheEntry);
}
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
index 1f185c1..6f802d5 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
@@ -133,4 +133,8 @@ class ProducerCacheEntry implements AutoCloseable {
return
Mono.from(this.producerActionTransformer.transform(source));
}
+ PublisherTransformer getProducerActionTransformer() {
+ return this.producerActionTransformer;
+ }
+
}
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
index a99ff9c..b65f040 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
@@ -16,6 +16,7 @@
package org.apache.pulsar.reactive.client.internal.adapter;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -72,7 +73,7 @@ class ReactiveProducerAdapter<T> {
return
AdapterImplementationFactory.adaptPulsarFuture(producer::closeAsync);
}
- <R> Mono<R> usingProducer(Function<Producer<T>, Mono<R>>
usingProducerAction) {
+ <R> Mono<R> usingProducer(BiFunction<Producer<T>, PublisherTransformer,
Mono<R>> usingProducerAction) {
if (this.producerCache != null) {
return usingCachedProducer(usingProducerAction);
}
@@ -81,16 +82,16 @@ class ReactiveProducerAdapter<T> {
}
}
- private <R> Mono<R> usingUncachedProducer(Function<Producer<T>,
Mono<R>> usingProducerAction) {
+ private <R> Mono<R> usingUncachedProducer(
+ BiFunction<Producer<T>, PublisherTransformer, Mono<R>>
usingProducerAction) {
return Mono.usingWhen(createProducerMono(),
(producer) ->
Mono.using(this.producerActionTransformer::get,
- (transformer) ->
usingProducerAction.apply(producer)
- .as((mono) ->
Mono.from(transformer.transform(mono))),
- Disposable::dispose),
+ (transformer) ->
usingProducerAction.apply(producer, transformer), Disposable::dispose),
this::closeProducer);
}
- private <R> Mono<R> usingCachedProducer(Function<Producer<T>, Mono<R>>
usingProducerAction) {
+ private <R> Mono<R> usingCachedProducer(
+ BiFunction<Producer<T>, PublisherTransformer, Mono<R>>
usingProducerAction) {
return
createCachedProducerKeyAndMono().flatMap((keyAndProducerMono) -> {
ProducerCacheKey cacheKey = keyAndProducerMono.getT1();
Mono<Producer<T>> producerMono =
keyAndProducerMono.getT2();
@@ -99,7 +100,7 @@ class ReactiveProducerAdapter<T> {
});
}
- <R> Flux<R> usingProducerMany(Function<Producer<T>, Flux<R>>
usingProducerAction) {
+ <R> Flux<R> usingProducerMany(BiFunction<Producer<T>,
PublisherTransformer, Flux<R>> usingProducerAction) {
if (this.producerCache != null) {
return usingCachedProducerMany(usingProducerAction);
}
@@ -108,13 +109,16 @@ class ReactiveProducerAdapter<T> {
}
}
- private <R> Flux<R> usingUncachedProducerMany(Function<Producer<T>,
Flux<R>> usingProducerAction) {
- return Flux.usingWhen(createProducerMono(), (producer) ->
Flux.using(this.producerActionTransformer::get,
- (transformer) ->
usingProducerAction.apply(producer).as(transformer::transform),
Disposable::dispose),
+ private <R> Flux<R> usingUncachedProducerMany(
+ BiFunction<Producer<T>, PublisherTransformer, Flux<R>>
usingProducerAction) {
+ return Flux.usingWhen(createProducerMono(),
+ (producer) ->
Flux.using(this.producerActionTransformer::get,
+ (transformer) ->
usingProducerAction.apply(producer, transformer), Disposable::dispose),
this::closeProducer);
}
- private <R> Flux<R> usingCachedProducerMany(Function<Producer<T>,
Flux<R>> usingProducerAction) {
+ private <R> Flux<R> usingCachedProducerMany(
+ BiFunction<Producer<T>, PublisherTransformer, Flux<R>>
usingProducerAction) {
return
createCachedProducerKeyAndMono().flatMapMany((keyAndProducerMono) -> {
ProducerCacheKey cacheKey = keyAndProducerMono.getT1();
Mono<Producer<T>> producerMono =
keyAndProducerMono.getT2();
diff --git
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
index 8d24258..43f73dd 100644
---
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
+++
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
@@ -19,10 +19,17 @@ package org.apache.pulsar.reactive.client.internal.adapter;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.pulsar.client.api.BatcherBuilder;
@@ -36,10 +43,12 @@ import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.internal.DefaultImplementation;
import
org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
import org.apache.pulsar.reactive.client.api.MessageSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
@@ -59,6 +68,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -318,4 +328,75 @@ class AdaptedReactiveMessageSenderTest {
assertThat(reconnectTimeout).isBetween(Duration.ofSeconds(4),
Duration.ofSeconds(5));
}
+ @Test
+ void maxInFlightUsingSendOne() throws Exception {
+ doTestMaxInFlight((reactiveSender, inputFlux) -> inputFlux
+ .flatMap((i) ->
reactiveSender.sendOne(MessageSpec.of(String.valueOf(i))), 100));
+ }
+
+ @Test
+ void maxInFlightUsingSendMany() throws Exception {
+ doTestMaxInFlight((reactiveSender, inputFlux) ->
inputFlux.window(3).flatMap(
+ (subFlux) -> subFlux.map((i) ->
MessageSpec.of(String.valueOf(i))).as(reactiveSender::sendMany), 100));
+ }
+
+ void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>,
Flux<Integer>, Flux<MessageId>> sendingFunction)
+ throws Exception {
+ ScheduledExecutorService executorService = null;
+ try {
+ executorService =
Executors.newSingleThreadScheduledExecutor();
+ final ScheduledExecutorService finalExecutorService =
executorService;
+ PulsarClientImpl pulsarClient = spy(
+ (PulsarClientImpl)
PulsarClient.builder().serviceUrl("http://dummy").build());
+ AtomicLong totalRequests = new AtomicLong();
+ AtomicLong requestsMax = new AtomicLong();
+ ProducerBase<String> producer =
mock(ProducerBase.class);
+
given(producer.closeAsync()).willReturn(CompletableFuture.completedFuture(null));
+ given(producer.isConnected()).willReturn(true);
+ given(producer.newMessage()).willAnswer((__) -> {
+ TypedMessageBuilderImpl<String>
typedMessageBuilder = spy(
+ new
TypedMessageBuilderImpl<>(producer, Schema.STRING));
+
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
+ CompletableFuture<MessageId>
messageSender = new CompletableFuture<>();
+ finalExecutorService.execute(() -> {
+ long current =
totalRequests.incrementAndGet();
+
requestsMax.accumulateAndGet(current, Math::max);
+ });
+ finalExecutorService.schedule(() -> {
+ totalRequests.decrementAndGet();
+ // encode integer in message
value to entry id in message id
+ int encodedEntryId =
Integer.parseInt(typedMessageBuilder.getMessage().getValue());
+ messageSender.complete(
+
DefaultImplementation.getDefaultImplementation().newMessageId(1,
encodedEntryId, 1));
+ }, 5, TimeUnit.MILLISECONDS);
+ return messageSender;
+ });
+ return typedMessageBuilder;
+ });
+
+ given(pulsarClient.createProducerAsync(any(),
eq(Schema.STRING), isNull()))
+
.willReturn(CompletableFuture.completedFuture(producer));
+
+ ReactiveMessageSender<String> reactiveSender =
AdaptedReactivePulsarClientFactory.create(pulsarClient)
+
.messageSender(Schema.STRING).maxInflight(7).cache(AdaptedReactivePulsarClientFactory.createCache())
+
.maxConcurrentSenderSubscriptions(1024).topic("my-topic").build();
+
+ List<Integer> inputValues = IntStream.rangeClosed(1,
1000).boxed().collect(Collectors.toList());
+
+ Flux<Integer> inputFlux =
Flux.fromIterable(inputValues);
+ Flux<MessageId> outputFlux =
sendingFunction.apply(reactiveSender, inputFlux);
+
+ // get message value from encoded entry id in message id
+ List<Integer> outputValues = outputFlux.map((m) ->
(int) ((MessageIdImpl) m).getEntryId()).collectList()
+ .block();
+
assertThat(outputValues).containsExactlyInAnyOrderElementsOf(inputValues);
+ assertThat(requestsMax.get()).isEqualTo(7);
+ }
+ finally {
+ if (executorService != null) {
+ executorService.shutdownNow();
+ }
+ }
+ }
+
}