This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 555e4b2  Replace OneByOneMessagePipelineBuilder#concurrency by 
OneByOneMessagePipelineBuilder#concurrency(int) (#39)
555e4b2 is described below

commit 555e4b2f1dbf31d3ccc1bc66481d46b5b2cb1180
Author: Christophe Bornet <cbor...@hotmail.com>
AuthorDate: Mon Nov 28 10:50:53 2022 +0100

    Replace OneByOneMessagePipelineBuilder#concurrency by 
OneByOneMessagePipelineBuilder#concurrency(int) (#39)
---
 .../reactive/client/adapter/ReactiveMessagePipelineE2ETest.java      | 2 +-
 .../pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java   | 4 +---
 .../client/internal/api/DefaultReactiveMessagePipelineBuilder.java   | 5 -----
 3 files changed, 2 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
index a654a1a..5ea6d7b 100644
--- 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
+++ 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
@@ -131,7 +131,7 @@ public class ReactiveMessagePipelineE2ETest {
                                                return messageHandler;
                                        });
                        if (messageOrderScenario != 
MessageOrderScenario.NO_PARALLEL) {
-                               
reactiveMessageHandlerBuilder.concurrent().concurrency(KEYS_COUNT).useKeyOrderedProcessing();
+                               
reactiveMessageHandlerBuilder.concurrency(KEYS_COUNT).useKeyOrderedProcessing();
                        }
                        try (ReactiveMessagePipeline reactiveMessagePipeline = 
reactiveMessageHandlerBuilder.build().start()) {
                                boolean latchCompleted = latch.await(5, 
TimeUnit.SECONDS);
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
index 86359a0..5d25555 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
@@ -45,7 +45,7 @@ public interface ReactiveMessagePipelineBuilder<T> {
 
                OneByOneMessagePipelineBuilder<T> 
errorLogger(BiConsumer<Message<T>, Throwable> errorLogger);
 
-               ConcurrentOneByOneMessagePipelineBuilder<T> concurrent();
+               ConcurrentOneByOneMessagePipelineBuilder<T> concurrency(int 
concurrency);
 
        }
 
@@ -55,8 +55,6 @@ public interface ReactiveMessagePipelineBuilder<T> {
 
                ConcurrentOneByOneMessagePipelineBuilder<T> 
groupOrderedProcessing(MessageGroupingFunction groupingFunction);
 
-               ConcurrentOneByOneMessagePipelineBuilder<T> concurrency(int 
concurrency);
-
                ConcurrentOneByOneMessagePipelineBuilder<T> maxInflight(int 
maxInflight);
 
        }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
index 59358ec..66b21a4 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
@@ -101,11 +101,6 @@ class DefaultReactiveMessagePipelineBuilder<T>
                return this;
        }
 
-       @Override
-       public ConcurrentOneByOneMessagePipelineBuilder<T> concurrent() {
-               return this;
-       }
-
        @Override
        public ConcurrentOneByOneMessagePipelineBuilder<T> 
useKeyOrderedProcessing() {
                Objects.requireNonNull(KEY_ORDERED_GROUPING_FUNCTION,

Reply via email to