This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 855ce09  Accept Reactive Streams Publisher as input (#12)
855ce09 is described below

commit 855ce09d269f47739664ed3ece3452e066c305fa
Author: Christophe Bornet <[email protected]>
AuthorDate: Mon Oct 24 13:31:53 2022 +0200

    Accept Reactive Streams Publisher as input (#12)
---
 README.adoc                                             |  4 ++--
 .../client/adapter/ReactiveMessageConsumerE2ETest.java  |  2 +-
 .../client/adapter/ReactiveMessagePipelineE2ETest.java  |  4 ++--
 .../client/adapter/ReactiveMessageReaderE2ETest.java    |  2 +-
 .../client/adapter/ReactiveMessageSenderE2ETest.java    |  5 ++---
 .../adapter/AdaptedReactiveMessageConsumer.java         | 14 +++++++++-----
 .../internal/adapter/AdaptedReactiveMessageSender.java  | 11 ++++++-----
 .../reactive/client/api/ReactiveMessageConsumer.java    |  3 ++-
 .../client/api/ReactiveMessagePipelineBuilder.java      |  7 ++++---
 .../reactive/client/api/ReactiveMessageSender.java      | 17 +++++++++++++++--
 .../api/DefaultReactiveMessagePipelineBuilder.java      | 13 +++++++------
 11 files changed, 51 insertions(+), 31 deletions(-)

diff --git a/README.adoc b/README.adoc
index 6516dfa..a4760c2 100644
--- a/README.adoc
+++ b/README.adoc
@@ -74,7 +74,7 @@ ReactiveMessageSender<String> messageSender = 
reactivePulsarClient
         .maxInflight(100)
         .build();
 Mono<MessageId> messageId = messageSender
-        .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
+        .send(MessageSpec.of("Hello world!"));
 // for demonstration
 messageId.subscribe(System.out::println);
 ----
@@ -122,7 +122,7 @@ ReactiveMessageSender<String> messageSender = 
reactivePulsarClient
         .maxInflight(100)
         .build();
 Mono<MessageId> messageId = messageSender
-        .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
+        .send(MessageSpec.of("Hello world!"));
 // for demonstration
 messageId.subscribe(System.out::println);
 ----
diff --git 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
index 2a5ada8..833ce88 100644
--- 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
+++ 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
@@ -48,7 +48,7 @@ public class ReactiveMessageConsumerE2ETest {
 
                        ReactiveMessageSender<String> messageSender = 
reactivePulsarClient.messageSender(Schema.STRING)
                                        
.cache(producerCache).topic(topicName).build();
-                       messageSender.sendMessages(Flux.range(1, 
100).map(Object::toString).map(MessageSpec::of)).blockLast();
+                       messageSender.send(Flux.range(1, 
100).map(Object::toString).map(MessageSpec::of)).blockLast();
 
                        ReactiveMessageConsumer<String> messageConsumer = 
reactivePulsarClient.messageConsumer(Schema.STRING)
                                        
.topic(topicName).subscriptionName("sub").build();
diff --git 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
index a0927eb..5b69a51 100644
--- 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
+++ 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
@@ -68,7 +68,7 @@ public class ReactiveMessagePipelineE2ETest {
 
                        ReactiveMessageSender<String> messageSender = 
reactivePulsarClient.messageSender(Schema.STRING)
                                        .topic(topicName).build();
-                       messageSender.sendMessages(Flux.range(1, 
100).map(Object::toString).map(MessageSpec::of)).blockLast();
+                       messageSender.send(Flux.range(1, 
100).map(Object::toString).map(MessageSpec::of)).blockLast();
 
                        List<String> messages = 
Collections.synchronizedList(new ArrayList<>());
                        CountDownLatch latch = new CountDownLatch(100);
@@ -102,7 +102,7 @@ public class ReactiveMessagePipelineE2ETest {
                        List<MessageSpec<Integer>> messageSpecs = 
generateRandomOrderedMessagesWhereSingleKeyIsOrdered(
                                        messageOrderScenario);
 
-                       
messageSender.sendMessages(Flux.fromIterable(messageSpecs)).blockLast();
+                       
messageSender.send(Flux.fromIterable(messageSpecs)).blockLast();
 
                        ConcurrentMap<Integer, List<Integer>> messages = new 
ConcurrentHashMap<>();
                        CountDownLatch latch = new 
CountDownLatch(messageSpecs.size());
diff --git 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
index ed7a7f9..a82fd3b 100644
--- 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
+++ 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
@@ -46,7 +46,7 @@ public class ReactiveMessageReaderE2ETest {
 
                        ReactiveMessageSender<String> messageSender = 
reactivePulsarClient.messageSender(Schema.STRING)
                                        
.cache(producerCache).topic(topicName).build();
-                       messageSender.sendMessages(Flux.range(1, 
100).map(Object::toString).map(MessageSpec::of)).blockLast();
+                       messageSender.send(Flux.range(1, 
100).map(Object::toString).map(MessageSpec::of)).blockLast();
 
                        ReactiveMessageReader<String> messageReader = 
reactivePulsarClient.messageReader(Schema.STRING)
                                        .topic(topicName).build();
diff --git 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
index 927f715..078a0d6 100644
--- 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
+++ 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
@@ -37,7 +37,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
-import reactor.core.publisher.Mono;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -64,7 +63,7 @@ public class ReactiveMessageSenderE2ETest {
 
                        ReactiveMessageSender<String> messageSender = 
reactivePulsarClient.messageSender(Schema.STRING)
                                        
.topic(topicName).maxInflight(1).build();
-                       MessageId messageId = 
messageSender.sendMessage(Mono.just(MessageSpec.of("Hello world!"))).block();
+                       MessageId messageId = 
messageSender.send(MessageSpec.of("Hello world!")).block();
                        assertThat(messageId).isNotNull();
 
                        Message<String> message = consumer.receive(1, 
TimeUnit.SECONDS);
@@ -87,7 +86,7 @@ public class ReactiveMessageSenderE2ETest {
 
                        ReactiveMessageSender<String> messageSender = 
reactivePulsarClient.messageSender(Schema.STRING)
                                        
.cache(producerCache).maxInflight(1).topic(topicName).build();
-                       MessageId messageId = 
messageSender.sendMessage(Mono.just(MessageSpec.of("Hello world!"))).block();
+                       MessageId messageId = 
messageSender.send(MessageSpec.of("Hello world!")).block();
                        assertThat(messageId).isNotNull();
 
                        Message<String> message = consumer.receive(1, 
TimeUnit.SECONDS);
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
index d3b5240..cdd44e7 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.reactive.client.api.MessageResult;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
+import org.reactivestreams.Publisher;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.SynchronousSink;
@@ -205,11 +206,14 @@ class AdaptedReactiveMessageConsumer<T> implements 
ReactiveMessageConsumer<T> {
        }
 
        @Override
-       public <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, 
Flux<MessageResult<R>>> messageHandler) {
-               return 
createReactiveConsumerAdapter().usingConsumerMany((consumer) -> 
Flux.using(this::pinAcknowledgeScheduler,
-                               (pinnedAcknowledgeScheduler) -> 
messageHandler.apply(readNextMessage(consumer).repeat()).delayUntil(
-                                               (messageResult) -> 
handleAcknowledgement(consumer, messageResult, pinnedAcknowledgeScheduler))
-                                               
.handle(this::handleMessageResult),
+       public <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, 
Publisher<MessageResult<R>>> messageHandler) {
+               return 
createReactiveConsumerAdapter().usingConsumerMany((consumer) -> Flux.using(
+                               this::pinAcknowledgeScheduler, (
+                                               pinnedAcknowledgeScheduler) -> 
Flux
+                                                               
.from(messageHandler.apply(readNextMessage(consumer).repeat()))
+                                                               
.delayUntil((messageResult) -> handleAcknowledgement(consumer, messageResult,
+                                                                               
pinnedAcknowledgeScheduler))
+                                                               
.handle(this::handleMessageResult),
                                Scheduler::dispose));
        }
 
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
index d5843cc..038b885 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
@@ -31,6 +31,7 @@ import 
org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
 import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
 import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
+import org.reactivestreams.Publisher;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -178,9 +179,8 @@ class AdaptedReactiveMessageSender<T> implements 
ReactiveMessageSender<T> {
        }
 
        @Override
-       public Mono<MessageId> sendMessage(Mono<MessageSpec<T>> messageSpec) {
-               return createReactiveProducerAdapter()
-                               .usingProducer((producer) -> 
messageSpec.flatMap((m) -> createMessageMono(m, producer)));
+       public Mono<MessageId> send(MessageSpec<T> messageSpec) {
+               return createReactiveProducerAdapter().usingProducer((producer) 
-> createMessageMono(messageSpec, producer));
        }
 
        private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec, 
Producer<T> producer) {
@@ -192,11 +192,12 @@ class AdaptedReactiveMessageSender<T> implements 
ReactiveMessageSender<T> {
        }
 
        @Override
-       public Flux<MessageId> sendMessages(Flux<MessageSpec<T>> messageSpecs) {
+       public Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs) {
                return 
createReactiveProducerAdapter().usingProducerMany((producer) ->
                // TODO: ensure that inner publishers are subscribed in order 
so that message
                // order is retained
-               messageSpecs.flatMapSequential((messageSpec) -> 
createMessageMono(messageSpec, producer), this.maxConcurrency));
+               Flux.from(messageSpecs).flatMapSequential((messageSpec) -> 
createMessageMono(messageSpec, producer),
+                               this.maxConcurrency));
        }
 
 }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
index 728f089..312fa48 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
@@ -19,6 +19,7 @@ package org.apache.pulsar.reactive.client.api;
 import java.util.function.Function;
 
 import org.apache.pulsar.client.api.Message;
+import org.reactivestreams.Publisher;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -26,7 +27,7 @@ public interface ReactiveMessageConsumer<T> {
 
        <R> Mono<R> consumeMessage(Function<Mono<Message<T>>, 
Mono<MessageResult<R>>> messageHandler);
 
-       <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, 
Flux<MessageResult<R>>> messageHandler);
+       <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, 
Publisher<MessageResult<R>>> messageHandler);
 
        /**
         * Creates the Pulsar Consumer and immediately closes it. This is 
useful for creating
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
index ab8e51d..86359a0 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
@@ -21,18 +21,19 @@ import java.util.function.BiConsumer;
 import java.util.function.Function;
 
 import org.apache.pulsar.client.api.Message;
+import org.reactivestreams.Publisher;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.retry.Retry;
 
 public interface ReactiveMessagePipelineBuilder<T> {
 
-       OneByOneMessagePipelineBuilder<T> messageHandler(Function<Message<T>, 
Mono<Void>> messageHandler);
+       OneByOneMessagePipelineBuilder<T> messageHandler(Function<Message<T>, 
Publisher<Void>> messageHandler);
 
        ReactiveMessagePipelineBuilder<T> streamingMessageHandler(
-                       Function<Flux<Message<T>>, Flux<MessageResult<Void>>> 
streamingMessageHandler);
+                       Function<Flux<Message<T>>, 
Publisher<MessageResult<Void>>> streamingMessageHandler);
 
-       ReactiveMessagePipelineBuilder<T> 
transformPipeline(Function<Mono<Void>, Mono<Void>> transformer);
+       ReactiveMessagePipelineBuilder<T> 
transformPipeline(Function<Mono<Void>, Publisher<Void>> transformer);
 
        ReactiveMessagePipelineBuilder<T> pipelineRetrySpec(Retry 
pipelineRetrySpec);
 
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
index 2edeb0f..50640dc 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
@@ -17,13 +17,26 @@
 package org.apache.pulsar.reactive.client.api;
 
 import org.apache.pulsar.client.api.MessageId;
+import org.reactivestreams.Publisher;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public interface ReactiveMessageSender<T> {
 
-       Mono<MessageId> sendMessage(Mono<MessageSpec<T>> messageSpec);
+       /**
+        * Send one message.
+        * @param messageSpec the spec of the message to send
+        * @return a publisher that will emit one message id and complete
+        */
+       Mono<MessageId> send(MessageSpec<T> messageSpec);
 
-       Flux<MessageId> sendMessages(Flux<MessageSpec<T>> messageSpecs);
+       /**
+        * Send multiple messages and get the associated message ids in the 
same order as the
+        * sent messages.
+        * @param messageSpecs the specs of the messages to send
+        * @return a publisher that will emit a message id per message 
successfully sent in
+        * the order that they have been sent
+        */
+       Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs);
 
 }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
index e92f683..59358ec 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.reactive.client.api.MessageResult;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
 import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
 import org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Flux;
@@ -55,7 +56,7 @@ class DefaultReactiveMessagePipelineBuilder<T>
 
        private final ReactiveMessageConsumer<T> messageConsumer;
 
-       private Function<Message<T>, Mono<Void>> messageHandler;
+       private Function<Message<T>, Publisher<Void>> messageHandler;
 
        private BiConsumer<Message<T>, Throwable> errorLogger;
 
@@ -67,9 +68,9 @@ class DefaultReactiveMessagePipelineBuilder<T>
 
        private Duration handlingTimeout = Duration.ofSeconds(120);
 
-       private Function<Mono<Void>, Mono<Void>> transformer = 
Function.identity();
+       private Function<Mono<Void>, Publisher<Void>> transformer = (it) -> it;
 
-       private Function<Flux<Message<T>>, Flux<MessageResult<Void>>> 
streamingMessageHandler;
+       private Function<Flux<Message<T>>, Publisher<MessageResult<Void>>> 
streamingMessageHandler;
 
        private int concurrency;
 
@@ -82,14 +83,14 @@ class DefaultReactiveMessagePipelineBuilder<T>
        }
 
        @Override
-       public OneByOneMessagePipelineBuilder<T> 
messageHandler(Function<Message<T>, Mono<Void>> messageHandler) {
+       public OneByOneMessagePipelineBuilder<T> 
messageHandler(Function<Message<T>, Publisher<Void>> messageHandler) {
                this.messageHandler = messageHandler;
                return this;
        }
 
        @Override
        public ReactiveMessagePipelineBuilder<T> streamingMessageHandler(
-                       Function<Flux<Message<T>>, Flux<MessageResult<Void>>> 
streamingMessageHandler) {
+                       Function<Flux<Message<T>>, 
Publisher<MessageResult<Void>>> streamingMessageHandler) {
                this.streamingMessageHandler = streamingMessageHandler;
                return this;
        }
@@ -145,7 +146,7 @@ class DefaultReactiveMessagePipelineBuilder<T>
        }
 
        @Override
-       public ReactiveMessagePipelineBuilder<T> 
transformPipeline(Function<Mono<Void>, Mono<Void>> transformer) {
+       public ReactiveMessagePipelineBuilder<T> 
transformPipeline(Function<Mono<Void>, Publisher<Void>> transformer) {
                this.transformer = transformer;
                return this;
        }

Reply via email to