This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d92151  Add javadoc to ReactiveMessagePipelineBuilder (#43)
0d92151 is described below

commit 0d92151116b6352db8771c6238e244f7a3d818bc
Author: Christophe Bornet <[email protected]>
AuthorDate: Mon Nov 28 11:49:51 2022 +0100

    Add javadoc to ReactiveMessagePipelineBuilder (#43)
---
 .../client/api/ReactiveMessagePipelineBuilder.java | 83 ++++++++++++++++++++++
 1 file changed, 83 insertions(+)

diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
index 5d25555..f4a6d55 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
@@ -26,35 +26,118 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.retry.Retry;
 
+/**
+ * Builder interface for {@link ReactiveMessagePipeline}.
+ *
+ * @param <T> the message payload type
+ * @author Lari Hotari
+ * @author Christophe Bornet
+ */
 public interface ReactiveMessagePipelineBuilder<T> {
 
+       /**
+        * Sets a handler function that processes messages one-by-one.
+        * @param messageHandler a function that takes a message as input and 
returns an empty
+        * Publisher
+        * @return a builder for the pipeline handling messages one-by-one
+        */
        OneByOneMessagePipelineBuilder<T> messageHandler(Function<Message<T>, 
Publisher<Void>> messageHandler);
 
+       /**
+        * Sets a handler function that processes the stream of messages.
+        * @param streamingMessageHandler a function that takes a stream of 
messages as input
+        * and returns a {@link MessageResult} that contains the 
acknowledgement or negative
+        * acknowledgement value of the processing.
+        * @return the pipeline builder instance
+        */
        ReactiveMessagePipelineBuilder<T> streamingMessageHandler(
                        Function<Flux<Message<T>>, 
Publisher<MessageResult<Void>>> streamingMessageHandler);
 
+       /**
+        * Sets a transform function that can be used to customize the pipeline.
+        * @param transformer a transform function
+        * @return the pipeline builder instance
+        * @see Mono#transform(Function)
+        */
        ReactiveMessagePipelineBuilder<T> 
transformPipeline(Function<Mono<Void>, Publisher<Void>> transformer);
 
+       /**
+        * Sets a retry spec that will be used in case of failures in the 
pipeline. The
+        * default is to retry indefinitely with an exponential backoff.
+        * @param pipelineRetrySpec the retry spec
+        * @return the pipeline builder instance
+        * @see Mono#retryWhen(Retry)
+        */
        ReactiveMessagePipelineBuilder<T> pipelineRetrySpec(Retry 
pipelineRetrySpec);
 
+       /**
+        * Builds the pipeline instance.
+        * @return the pipeline instance
+        */
        ReactiveMessagePipeline build();
 
+       /**
+        * Builder interface for a pipeline that handles messages one-by-one.
+        *
+        * @param <T> the message payload type
+        * @see #messageHandler(Function)
+        */
        interface OneByOneMessagePipelineBuilder<T> extends 
ReactiveMessagePipelineBuilder<T> {
 
+               /**
+                * Sets the timeout for the message handler function. Defaults 
to 2 minutes.
+                * @param handlingTimeout the handling timeout value
+                * @return the pipeline builder instance
+                * @see #messageHandler(Function)
+                */
                OneByOneMessagePipelineBuilder<T> handlingTimeout(Duration 
handlingTimeout);
 
+               /**
+                * Sets a function which will be called when the message 
handler emits an error.
+                * @param errorLogger the error logger function
+                * @return the pipeline builder instance
+                */
                OneByOneMessagePipelineBuilder<T> 
errorLogger(BiConsumer<Message<T>, Throwable> errorLogger);
 
+               /**
+                * Sets the concurrency for the pipeline. The messages will be 
dispatched to
+                * concurrent instances of the message handler.
+                * @param concurrency the number of concurrent message handlers
+                * @return a concurrent pipeline builder instance
+                */
                ConcurrentOneByOneMessagePipelineBuilder<T> concurrency(int 
concurrency);
 
        }
 
+       /**
+        * Builder interface for a pipeline that handles messages with 
concurrent one-by-one
+        * messages handlers.
+        *
+        * @param <T> the message payload type
+        * @see #concurrency(int)
+        */
        interface ConcurrentOneByOneMessagePipelineBuilder<T> extends 
OneByOneMessagePipelineBuilder<T> {
 
+               /**
+                * Sets whether messages with the same key should be sent in 
order to the same
+                * message handler.
+                * @return the pipeline instance builder
+                */
                ConcurrentOneByOneMessagePipelineBuilder<T> 
useKeyOrderedProcessing();
 
+               /**
+                * Sets a function to group messages to be sent to the same 
message handler.
+                * @param groupingFunction the function used to group the 
messages
+                * @return the pipeline instance builder
+                */
                ConcurrentOneByOneMessagePipelineBuilder<T> 
groupOrderedProcessing(MessageGroupingFunction groupingFunction);
 
+               /**
+                * Sets a global limit to the number of messages in-flight over 
all the concurrent
+                * handlers.
+                * @param maxInflight the maximum in-flight messages
+                * @return the pipeline instance builder
+                */
                ConcurrentOneByOneMessagePipelineBuilder<T> maxInflight(int 
maxInflight);
 
        }

Reply via email to