This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 271545e Move ReactivePulsarClient::messagePipeline to
ReactiveMessageConsumer (#17)
271545e is described below
commit 271545e6fd414337096fd61b514f99da9f9ce6af
Author: Christophe Bornet <[email protected]>
AuthorDate: Wed Nov 9 13:45:00 2022 +0100
Move ReactivePulsarClient::messagePipeline to ReactiveMessageConsumer (#17)
* Make ReactivePulsarClient::messagePipeline static
* Move ReactivePulsarClient::messagePipeline to ReactiveMessageConsumer
---
README.adoc | 14 +++++++-------
.../client/adapter/ReactiveMessagePipelineE2ETest.java | 8 +++-----
.../reactive/client/api/ReactiveMessageConsumer.java | 9 +++++++++
.../pulsar/reactive/client/api/ReactivePulsarClient.java | 15 +--------------
4 files changed, 20 insertions(+), 26 deletions(-)
diff --git a/README.adoc b/README.adoc
index 2b884f2..28b109a 100644
--- a/README.adoc
+++ b/README.adoc
@@ -190,13 +190,13 @@ With `.endOfStreamAction(EndOfStreamAction.POLL)` the
Reader will poll for new m
[source,java]
----
-ReactiveMessageHandler reactiveMessagePipeline=
- reactivePulsarClient
- .messagePipeline(reactivePulsarClient
- .messageConsumer(Schema.STRING)
- .subscriptionName("sub")
- .topic(topicName)
- .build())
+ReactiveMessageHandler reactiveMessagePipeline =
+ reactivePulsarClient
+ .messageConsumer(Schema.STRING)
+ .subscriptionName("sub")
+ .topic(topicName)
+ .build()
+ .messagePipeline()
.messageHandler(message -> Mono.fromRunnable(()->{
System.out.println(message.getValue());
}))
diff --git
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
index 04c8c8d..a654a1a 100644
---
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
+++
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
@@ -73,9 +73,8 @@ public class ReactiveMessagePipelineE2ETest {
List<String> messages =
Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(100);
- try (ReactiveMessagePipeline reactiveMessagePipeline =
reactivePulsarClient
-
.messagePipeline(reactivePulsarClient.messageConsumer(Schema.STRING).subscriptionName("sub")
-
.topic(topicName).build())
+ try (ReactiveMessagePipeline reactiveMessagePipeline =
reactivePulsarClient.messageConsumer(Schema.STRING)
+
.subscriptionName("sub").topic(topicName).build().messagePipeline()
.messageHandler((message) ->
Mono.fromRunnable(() -> {
messages.add(message.getValue());
latch.countDown();
@@ -111,8 +110,7 @@ public class ReactiveMessagePipelineE2ETest {
.collect(Collectors.toList());
ReactiveMessagePipelineBuilder.OneByOneMessagePipelineBuilder<Integer>
reactiveMessageHandlerBuilder = reactivePulsarClient
-
.messagePipeline(reactivePulsarClient.messageConsumer(Schema.INT32).subscriptionName("sub")
-
.topic(topicName).build())
+
.messageConsumer(Schema.INT32).subscriptionName("sub").topic(topicName).build().messagePipeline()
.messageHandler((message) -> {
Mono<Void> messageHandler =
Mono.fromRunnable(() -> {
Integer keyId =
Integer.parseInt(message.getProperty("keyId"));
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
index 6cc70d3..16ccf2d 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
@@ -19,6 +19,7 @@ package org.apache.pulsar.reactive.client.api;
import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.reactive.client.internal.api.ApiImplementationFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -29,6 +30,14 @@ public interface ReactiveMessageConsumer<T> {
<R> Flux<R> consumeMany(Function<Flux<Message<T>>,
Publisher<MessageResult<R>>> messageHandler);
+ /**
+ * Creates a builder for building a {@link ReactiveMessagePipeline}.
+ * @return a builder for building a {@link ReactiveMessagePipeline}
+ */
+ default ReactiveMessagePipelineBuilder<T> messagePipeline() {
+ return
ApiImplementationFactory.createReactiveMessageHandlerPipelineBuilder(this);
+ }
+
/**
* Creates the Pulsar Consumer and immediately closes it. This is
useful for creating
* the Pulsar subscription that is related to the consumer. Nothing
happens unless the
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
index ed5d6fc..8bad89c 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
@@ -17,14 +17,12 @@
package org.apache.pulsar.reactive.client.api;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.reactive.client.internal.api.ApiImplementationFactory;
/**
* Apache Pulsar Reactive Client interface
*
* Contains methods to create builders for {@link ReactiveMessageSender},
- * {@link ReactiveMessageReader} {@link ReactiveMessageConsumer} and
- * {@link ReactiveMessagePipeline} instances.
+ * {@link ReactiveMessageReader} and {@link ReactiveMessageConsumer} instances.
*
* @author Lari Hotari
*/
@@ -54,15 +52,4 @@ public interface ReactivePulsarClient {
*/
<T> ReactiveMessageConsumerBuilder<T> messageConsumer(Schema<T> schema);
- /**
- * Creates a builder for building a {@link ReactiveMessagePipeline}.
- * @param messageConsumer the {@link ReactiveMessageConsumer} instance
to run in the
- * pipeline
- * @param <T> the message payload type
- * @return a builder for building a {@link ReactiveMessagePipeline}
- */
- default <T> ReactiveMessagePipelineBuilder<T>
messagePipeline(ReactiveMessageConsumer<T> messageConsumer) {
- return
ApiImplementationFactory.createReactiveMessageHandlerPipelineBuilder(messageConsumer);
- }
-
}