This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 0d92151 Add javadoc to ReactiveMessagePipelineBuilder (#43)
0d92151 is described below
commit 0d92151116b6352db8771c6238e244f7a3d818bc
Author: Christophe Bornet <[email protected]>
AuthorDate: Mon Nov 28 11:49:51 2022 +0100
Add javadoc to ReactiveMessagePipelineBuilder (#43)
---
.../client/api/ReactiveMessagePipelineBuilder.java | 83 ++++++++++++++++++++++
1 file changed, 83 insertions(+)
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
index 5d25555..f4a6d55 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
@@ -26,35 +26,118 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
+/**
+ * Builder interface for {@link ReactiveMessagePipeline}.
+ *
+ * @param <T> the message payload type
+ * @author Lari Hotari
+ * @author Christophe Bornet
+ */
public interface ReactiveMessagePipelineBuilder<T> {
+ /**
+ * Sets a handler function that processes messages one-by-one.
+ * @param messageHandler a function that takes a message as input and
returns an empty
+ * Publisher
+ * @return a builder for the pipeline handling messages one-by-one
+ */
OneByOneMessagePipelineBuilder<T> messageHandler(Function<Message<T>,
Publisher<Void>> messageHandler);
+ /**
+ * Sets a handler function that processes the stream of messages.
+ * @param streamingMessageHandler a function that takes a stream of
messages as input
+ * and returns a {@link MessageResult} that contains the
acknowledgement or negative
+ * acknowledgement value of the processing.
+ * @return the pipeline builder instance
+ */
ReactiveMessagePipelineBuilder<T> streamingMessageHandler(
Function<Flux<Message<T>>,
Publisher<MessageResult<Void>>> streamingMessageHandler);
+ /**
+ * Sets a transform function that can be used to customize the pipeline.
+ * @param transformer a transform function
+ * @return the pipeline builder instance
+ * @see Mono#transform(Function)
+ */
ReactiveMessagePipelineBuilder<T>
transformPipeline(Function<Mono<Void>, Publisher<Void>> transformer);
+ /**
+ * Sets a retry spec that will be used in case of failures in the
pipeline. The
+ * default is to retry indefinitely with an exponential backoff.
+ * @param pipelineRetrySpec the retry spec
+ * @return the pipeline builder instance
+ * @see Mono#retryWhen(Retry)
+ */
ReactiveMessagePipelineBuilder<T> pipelineRetrySpec(Retry
pipelineRetrySpec);
+ /**
+ * Builds the pipeline instance.
+ * @return the pipeline instance
+ */
ReactiveMessagePipeline build();
+ /**
+ * Builder interface for a pipeline that handles messages one-by-one.
+ *
+ * @param <T> the message payload type
+ * @see #messageHandler(Function)
+ */
interface OneByOneMessagePipelineBuilder<T> extends
ReactiveMessagePipelineBuilder<T> {
+ /**
+ * Sets the timeout for the message handler function. Defaults
to 2 minutes.
+ * @param handlingTimeout the handling timeout value
+ * @return the pipeline builder instance
+ * @see #messageHandler(Function)
+ */
OneByOneMessagePipelineBuilder<T> handlingTimeout(Duration
handlingTimeout);
+ /**
+ * Sets a function which will be called when the message
handler emits an error.
+ * @param errorLogger the error logger function
+ * @return the pipeline builder instance
+ */
OneByOneMessagePipelineBuilder<T>
errorLogger(BiConsumer<Message<T>, Throwable> errorLogger);
+ /**
+ * Sets the concurrency for the pipeline. The messages will be
dispatched to
+ * concurrent instances of the message handler.
+ * @param concurrency the number of concurrent message handlers
+ * @return a concurrent pipeline builder instance
+ */
ConcurrentOneByOneMessagePipelineBuilder<T> concurrency(int
concurrency);
}
+ /**
+ * Builder interface for a pipeline that handles messages with
concurrent one-by-one
+ * messages handlers.
+ *
+ * @param <T> the message payload type
+ * @see #concurrency(int)
+ */
interface ConcurrentOneByOneMessagePipelineBuilder<T> extends
OneByOneMessagePipelineBuilder<T> {
+ /**
+ * Sets whether messages with the same key should be sent in
order to the same
+ * message handler.
+ * @return the pipeline instance builder
+ */
ConcurrentOneByOneMessagePipelineBuilder<T>
useKeyOrderedProcessing();
+ /**
+ * Sets a function to group messages to be sent to the same
message handler.
+ * @param groupingFunction the function used to group the
messages
+ * @return the pipeline instance builder
+ */
ConcurrentOneByOneMessagePipelineBuilder<T>
groupOrderedProcessing(MessageGroupingFunction groupingFunction);
+ /**
+ * Sets a global limit to the number of messages in-flight over
all the concurrent
+ * handlers.
+ * @param maxInflight the maximum in-flight messages
+ * @return the pipeline instance builder
+ */
ConcurrentOneByOneMessagePipelineBuilder<T> maxInflight(int
maxInflight);
}