cbornet commented on code in PR #89:
URL:
https://github.com/apache/pulsar-client-reactive/pull/89#discussion_r1042339689
##########
pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java:
##########
@@ -318,4 +328,75 @@ void senderCacheEntryRecreatedIfProducerClosed() throws
Exception {
assertThat(reconnectTimeout).isBetween(Duration.ofSeconds(4),
Duration.ofSeconds(5));
}
+ @Test
+ void maxInFlightUsingSendOne() throws Exception {
+ doTestMaxInFlight((reactiveSender, inputFlux) -> inputFlux
+ .flatMap((i) ->
reactiveSender.sendOne(MessageSpec.of(String.valueOf(i))), 100));
+ }
+
+ @Test
+ void maxInFlightUsingSendMany() throws Exception {
+ doTestMaxInFlight((reactiveSender, inputFlux) ->
inputFlux.window(3).flatMap(
+ (subFlux) -> subFlux.map((i) ->
MessageSpec.of(String.valueOf(i))).as(reactiveSender::sendMany), 100));
+ }
+
+ void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>,
Flux<Integer>, Flux<MessageId>> sendingFunction)
+ throws Exception {
+ ScheduledExecutorService executorService = null;
+ try {
+ executorService =
Executors.newSingleThreadScheduledExecutor();
+ final ScheduledExecutorService finalExecutorService =
executorService;
+ PulsarClientImpl pulsarClient = spy(
+ (PulsarClientImpl)
PulsarClient.builder().serviceUrl("http://dummy").build());
+ AtomicLong totalRequests = new AtomicLong();
+ AtomicLong requestsMax = new AtomicLong();
+ ProducerBase<String> producer =
mock(ProducerBase.class);
+
given(producer.closeAsync()).willReturn(CompletableFuture.completedFuture(null));
+ given(producer.isConnected()).willReturn(true);
+ given(producer.newMessage()).willAnswer((__) -> {
+ TypedMessageBuilderImpl<String>
typedMessageBuilder = spy(
+ new
TypedMessageBuilderImpl<>(producer, Schema.STRING));
+
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
+ CompletableFuture<MessageId>
messageSender = new CompletableFuture<>();
+ finalExecutorService.execute(() -> {
+ long current =
totalRequests.incrementAndGet();
+
requestsMax.accumulateAndGet(current, Math::max);
+ });
+ finalExecutorService.schedule(() -> {
+ totalRequests.decrementAndGet();
+ // encode integer in message
value to entry id in message id
+ int encodedEntryId =
Integer.parseInt(typedMessageBuilder.getMessage().getValue());
+ messageSender.complete(
+
DefaultImplementation.getDefaultImplementation().newMessageId(1,
encodedEntryId, 1));
+ }, 5, TimeUnit.MILLISECONDS);
+ return messageSender;
+ });
+ return typedMessageBuilder;
+ });
+
+ given(pulsarClient.createProducerAsync(any(),
eq(Schema.STRING), isNull()))
+
.willReturn(CompletableFuture.completedFuture(producer));
+
+ ReactiveMessageSender<String> reactiveSender =
AdaptedReactivePulsarClientFactory.create(pulsarClient)
+
.messageSender(Schema.STRING).maxInflight(7).cache(AdaptedReactivePulsarClientFactory.createCache())
Review Comment:
We could test several values of maxInFlight with parameterized tests like
what has been done for InflightLimiterTest.
Can be done in follow-up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]