This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push: new 555e4b2 Replace OneByOneMessagePipelineBuilder#concurrency by OneByOneMessagePipelineBuilder#concurrency(int) (#39) 555e4b2 is described below commit 555e4b2f1dbf31d3ccc1bc66481d46b5b2cb1180 Author: Christophe Bornet <cbor...@hotmail.com> AuthorDate: Mon Nov 28 10:50:53 2022 +0100 Replace OneByOneMessagePipelineBuilder#concurrency by OneByOneMessagePipelineBuilder#concurrency(int) (#39) --- .../reactive/client/adapter/ReactiveMessagePipelineE2ETest.java | 2 +- .../pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java | 4 +--- .../client/internal/api/DefaultReactiveMessagePipelineBuilder.java | 5 ----- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java index a654a1a..5ea6d7b 100644 --- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java +++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java @@ -131,7 +131,7 @@ public class ReactiveMessagePipelineE2ETest { return messageHandler; }); if (messageOrderScenario != MessageOrderScenario.NO_PARALLEL) { - reactiveMessageHandlerBuilder.concurrent().concurrency(KEYS_COUNT).useKeyOrderedProcessing(); + reactiveMessageHandlerBuilder.concurrency(KEYS_COUNT).useKeyOrderedProcessing(); } try (ReactiveMessagePipeline reactiveMessagePipeline = reactiveMessageHandlerBuilder.build().start()) { boolean latchCompleted = latch.await(5, TimeUnit.SECONDS); diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java index 86359a0..5d25555 100644 --- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java +++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java @@ -45,7 +45,7 @@ public interface ReactiveMessagePipelineBuilder<T> { OneByOneMessagePipelineBuilder<T> errorLogger(BiConsumer<Message<T>, Throwable> errorLogger); - ConcurrentOneByOneMessagePipelineBuilder<T> concurrent(); + ConcurrentOneByOneMessagePipelineBuilder<T> concurrency(int concurrency); } @@ -55,8 +55,6 @@ public interface ReactiveMessagePipelineBuilder<T> { ConcurrentOneByOneMessagePipelineBuilder<T> groupOrderedProcessing(MessageGroupingFunction groupingFunction); - ConcurrentOneByOneMessagePipelineBuilder<T> concurrency(int concurrency); - ConcurrentOneByOneMessagePipelineBuilder<T> maxInflight(int maxInflight); } diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java index 59358ec..66b21a4 100644 --- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java +++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java @@ -101,11 +101,6 @@ class DefaultReactiveMessagePipelineBuilder<T> return this; } - @Override - public ConcurrentOneByOneMessagePipelineBuilder<T> concurrent() { - return this; - } - @Override public ConcurrentOneByOneMessagePipelineBuilder<T> useKeyOrderedProcessing() { Objects.requireNonNull(KEY_ORDERED_GROUPING_FUNCTION,