This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 271545e  Move ReactivePulsarClient::messagePipeline to 
ReactiveMessageConsumer (#17)
271545e is described below

commit 271545e6fd414337096fd61b514f99da9f9ce6af
Author: Christophe Bornet <[email protected]>
AuthorDate: Wed Nov 9 13:45:00 2022 +0100

    Move ReactivePulsarClient::messagePipeline to ReactiveMessageConsumer (#17)
    
    * Make ReactivePulsarClient::messagePipeline static
    
    * Move ReactivePulsarClient::messagePipeline to ReactiveMessageConsumer
---
 README.adoc                                               | 14 +++++++-------
 .../client/adapter/ReactiveMessagePipelineE2ETest.java    |  8 +++-----
 .../reactive/client/api/ReactiveMessageConsumer.java      |  9 +++++++++
 .../pulsar/reactive/client/api/ReactivePulsarClient.java  | 15 +--------------
 4 files changed, 20 insertions(+), 26 deletions(-)

diff --git a/README.adoc b/README.adoc
index 2b884f2..28b109a 100644
--- a/README.adoc
+++ b/README.adoc
@@ -190,13 +190,13 @@ With `.endOfStreamAction(EndOfStreamAction.POLL)` the 
Reader will poll for new m
 
 [source,java]
 ----
-ReactiveMessageHandler reactiveMessagePipeline=
-       reactivePulsarClient
-        .messagePipeline(reactivePulsarClient
-           .messageConsumer(Schema.STRING)
-           .subscriptionName("sub")
-           .topic(topicName)
-           .build())
+ReactiveMessageHandler reactiveMessagePipeline =
+    reactivePulsarClient
+        .messageConsumer(Schema.STRING)
+        .subscriptionName("sub")
+        .topic(topicName)
+        .build()
+        .messagePipeline()
         .messageHandler(message -> Mono.fromRunnable(()->{
             System.out.println(message.getValue());
         }))
diff --git 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
index 04c8c8d..a654a1a 100644
--- 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
+++ 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
@@ -73,9 +73,8 @@ public class ReactiveMessagePipelineE2ETest {
                        List<String> messages = 
Collections.synchronizedList(new ArrayList<>());
                        CountDownLatch latch = new CountDownLatch(100);
 
-                       try (ReactiveMessagePipeline reactiveMessagePipeline = 
reactivePulsarClient
-                                       
.messagePipeline(reactivePulsarClient.messageConsumer(Schema.STRING).subscriptionName("sub")
-                                                       
.topic(topicName).build())
+                       try (ReactiveMessagePipeline reactiveMessagePipeline = 
reactivePulsarClient.messageConsumer(Schema.STRING)
+                                       
.subscriptionName("sub").topic(topicName).build().messagePipeline()
                                        .messageHandler((message) -> 
Mono.fromRunnable(() -> {
                                                
messages.add(message.getValue());
                                                latch.countDown();
@@ -111,8 +110,7 @@ public class ReactiveMessagePipelineE2ETest {
                                        .collect(Collectors.toList());
 
                        
ReactiveMessagePipelineBuilder.OneByOneMessagePipelineBuilder<Integer> 
reactiveMessageHandlerBuilder = reactivePulsarClient
-                                       
.messagePipeline(reactivePulsarClient.messageConsumer(Schema.INT32).subscriptionName("sub")
-                                                       
.topic(topicName).build())
+                                       
.messageConsumer(Schema.INT32).subscriptionName("sub").topic(topicName).build().messagePipeline()
                                        .messageHandler((message) -> {
                                                Mono<Void> messageHandler = 
Mono.fromRunnable(() -> {
                                                        Integer keyId = 
Integer.parseInt(message.getProperty("keyId"));
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
index 6cc70d3..16ccf2d 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
@@ -19,6 +19,7 @@ package org.apache.pulsar.reactive.client.api;
 import java.util.function.Function;
 
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.reactive.client.internal.api.ApiImplementationFactory;
 import org.reactivestreams.Publisher;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -29,6 +30,14 @@ public interface ReactiveMessageConsumer<T> {
 
        <R> Flux<R> consumeMany(Function<Flux<Message<T>>, 
Publisher<MessageResult<R>>> messageHandler);
 
+       /**
+        * Creates a builder for building a {@link ReactiveMessagePipeline}.
+        * @return a builder for building a {@link ReactiveMessagePipeline}
+        */
+       default ReactiveMessagePipelineBuilder<T> messagePipeline() {
+               return 
ApiImplementationFactory.createReactiveMessageHandlerPipelineBuilder(this);
+       }
+
        /**
         * Creates the Pulsar Consumer and immediately closes it. This is 
useful for creating
         * the Pulsar subscription that is related to the consumer. Nothing 
happens unless the
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
index ed5d6fc..8bad89c 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
@@ -17,14 +17,12 @@
 package org.apache.pulsar.reactive.client.api;
 
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.reactive.client.internal.api.ApiImplementationFactory;
 
 /**
  * Apache Pulsar Reactive Client interface
  *
  * Contains methods to create builders for {@link ReactiveMessageSender},
- * {@link ReactiveMessageReader} {@link ReactiveMessageConsumer} and
- * {@link ReactiveMessagePipeline} instances.
+ * {@link ReactiveMessageReader} and {@link ReactiveMessageConsumer} instances.
  *
  * @author Lari Hotari
  */
@@ -54,15 +52,4 @@ public interface ReactivePulsarClient {
         */
        <T> ReactiveMessageConsumerBuilder<T> messageConsumer(Schema<T> schema);
 
-       /**
-        * Creates a builder for building a {@link ReactiveMessagePipeline}.
-        * @param messageConsumer the {@link ReactiveMessageConsumer} instance 
to run in the
-        * pipeline
-        * @param <T> the message payload type
-        * @return a builder for building a {@link ReactiveMessagePipeline}
-        */
-       default <T> ReactiveMessagePipelineBuilder<T> 
messagePipeline(ReactiveMessageConsumer<T> messageConsumer) {
-               return 
ApiImplementationFactory.createReactiveMessageHandlerPipelineBuilder(messageConsumer);
-       }
-
 }

Reply via email to