This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 3340083  Require cache when maxInflight is set for 
ReactiveMessageSenderBuilder (#91)
3340083 is described below

commit 334008356ddb98a611058394f8903d4f08c2d6e6
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Dec 7 17:21:48 2022 +0200

    Require cache when maxInflight is set for ReactiveMessageSenderBuilder (#91)
---
 .../pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java    | 2 +-
 .../client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java    | 1 +
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
index bc40351..f3acda1 100644
--- 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
+++ 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
@@ -62,7 +62,7 @@ class ReactiveMessageSenderE2ETest {
                                ReactivePulsarClient reactivePulsarClient = 
AdaptedReactivePulsarClientFactory.create(pulsarClient);
 
                                ReactiveMessageSender<String> messageSender = 
reactivePulsarClient.messageSender(Schema.STRING)
-                                               
.topic(topicName).maxInflight(1).build();
+                                               .topic(topicName).build();
                                MessageId messageId = 
messageSender.sendOne(MessageSpec.of("Hello world!")).block();
                                assertThat(messageId).isNotNull();
 
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
index 18fb53a..20f75f8 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
@@ -99,6 +99,7 @@ class AdaptedReactiveMessageSenderBuilder<T> implements 
ReactiveMessageSenderBui
        public ReactiveMessageSender<T> build() {
                Object producerActionTransformerKey;
                if (this.maxInflight > 0) {
+                       Objects.requireNonNull(this.producerCache, "cache must 
be provided when maxInflight is set.");
                        this.producerActionTransformer = () -> new 
InflightLimiter(this.maxInflight,
                                        Math.max(this.maxInflight / 2, 1), 
Schedulers.single(), this.maxConcurrentSenderSubscriptions);
                        producerActionTransformerKey = new 
ProducerActionTransformerKey(this.maxInflight,

Reply via email to