This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new e2e06c0  Some cleanups (#79)
e2e06c0 is described below

commit e2e06c049653a975132e65a33b771c10621d5d0f
Author: Christophe Bornet <[email protected]>
AuthorDate: Fri Dec 2 16:38:41 2022 +0100

    Some cleanups (#79)
---
 .../adapter/ReactiveMessagePipelineE2ETest.java    |  6 ++--
 .../adapter/ReactiveMessageSenderE2ETest.java      | 42 +++++++++++-----------
 .../AdaptedReactiveMessageConsumerBuilder.java     |  2 +-
 .../ConcurrentHashMapProducerCacheProvider.java    |  4 +--
 .../client/internal/adapter/ProducerCache.java     | 10 ++----
 .../internal/adapter/ProducerCacheEntry.java       | 10 ++----
 .../client/internal/adapter/ProducerCacheKey.java  |  4 ---
 .../internal/adapter/PulsarFutureAdapter.java      |  2 +-
 .../adapter/ReactiveConsumerAdapterFactory.java    |  2 +-
 .../internal/adapter/ReactiveProducerAdapter.java  |  4 +--
 .../adapter/ReactivePulsarResourceAdapter.java     |  8 -----
 .../adapter/ReactiveReaderAdapterFactory.java      |  2 +-
 .../reactive/client/api/MessageSpecBuilder.java    |  2 +-
 .../client/api/ReactiveMessageConsumerBuilder.java | 26 +++++++-------
 .../reactive/client/api/ReactivePulsarClient.java  |  4 +--
 .../internal/api/ApiImplementationFactory.java     |  2 +-
 .../internal/api/DefaultMessageSpecBuilder.java    |  2 +-
 .../api/GroupOrderedMessageProcessors.java         |  2 +-
 .../reactive/client/api/MessageSpecTest.java       |  3 +-
 19 files changed, 59 insertions(+), 78 deletions(-)

diff --git 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
index 25dd1f1..3914961 100644
--- 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
+++ 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
@@ -73,13 +73,13 @@ class ReactiveMessagePipelineE2ETest {
                        List<String> messages = 
Collections.synchronizedList(new ArrayList<>());
                        CountDownLatch latch = new CountDownLatch(100);
 
-                       try (ReactiveMessagePipeline reactiveMessagePipeline = 
reactivePulsarClient.messageConsumer(Schema.STRING)
+                       try (ReactiveMessagePipeline ignored = 
reactivePulsarClient.messageConsumer(Schema.STRING)
                                        
.subscriptionName("sub").topic(topicName).build().messagePipeline()
                                        .messageHandler((message) -> 
Mono.fromRunnable(() -> {
                                                
messages.add(message.getValue());
                                                latch.countDown();
                                        })).build().start()) {
-                               latch.await(5, TimeUnit.SECONDS);
+                               assertThat(latch.await(5, 
TimeUnit.SECONDS)).isTrue();
                                assertThat(messages).isEqualTo(Flux.range(1, 
100).map(Object::toString).collectList().block());
                        }
                }
@@ -133,7 +133,7 @@ class ReactiveMessagePipelineE2ETest {
                        if (messageOrderScenario != 
MessageOrderScenario.NO_PARALLEL) {
                                
reactiveMessageHandlerBuilder.concurrency(KEYS_COUNT).useKeyOrderedProcessing();
                        }
-                       try (ReactiveMessagePipeline reactiveMessagePipeline = 
reactiveMessageHandlerBuilder.build().start()) {
+                       try (ReactiveMessagePipeline ignored = 
reactiveMessageHandlerBuilder.build().start()) {
                                boolean latchCompleted = latch.await(5, 
TimeUnit.SECONDS);
                                assertThat(latchCompleted).as("processing of 
all messages should have completed").isTrue();
                                for (int i = 1; i <= KEYS_COUNT; i++) {
diff --git 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
index 9a43150..bc40351 100644
--- 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
+++ 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
@@ -56,19 +56,20 @@ class ReactiveMessageSenderE2ETest {
        void shouldSendMessageToTopic() throws PulsarClientException {
                try (PulsarClient pulsarClient = 
SingletonPulsarContainer.createPulsarClient()) {
                        String topicName = "test" + UUID.randomUUID();
-                       Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub")
-                                       .subscribe();
+                       try (Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                                       .subscriptionName("sub").subscribe()) {
 
-                       ReactivePulsarClient reactivePulsarClient = 
AdaptedReactivePulsarClientFactory.create(pulsarClient);
+                               ReactivePulsarClient reactivePulsarClient = 
AdaptedReactivePulsarClientFactory.create(pulsarClient);
 
-                       ReactiveMessageSender<String> messageSender = 
reactivePulsarClient.messageSender(Schema.STRING)
-                                       
.topic(topicName).maxInflight(1).build();
-                       MessageId messageId = 
messageSender.sendOne(MessageSpec.of("Hello world!")).block();
-                       assertThat(messageId).isNotNull();
+                               ReactiveMessageSender<String> messageSender = 
reactivePulsarClient.messageSender(Schema.STRING)
+                                               
.topic(topicName).maxInflight(1).build();
+                               MessageId messageId = 
messageSender.sendOne(MessageSpec.of("Hello world!")).block();
+                               assertThat(messageId).isNotNull();
 
-                       Message<String> message = consumer.receive(1, 
TimeUnit.SECONDS);
-                       assertThat(message).isNotNull();
-                       assertThat(message.getValue()).isEqualTo("Hello 
world!");
+                               Message<String> message = consumer.receive(1, 
TimeUnit.SECONDS);
+                               assertThat(message).isNotNull();
+                               assertThat(message.getValue()).isEqualTo("Hello 
world!");
+                       }
                }
        }
 
@@ -79,19 +80,20 @@ class ReactiveMessageSenderE2ETest {
                try (PulsarClient pulsarClient = 
SingletonPulsarContainer.createPulsarClient();
                                ReactiveMessageSenderCache producerCache = 
cacheInstance) {
                        String topicName = "test" + UUID.randomUUID();
-                       Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub")
-                                       .subscribe();
+                       try (Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                                       .subscriptionName("sub").subscribe()) {
 
-                       ReactivePulsarClient reactivePulsarClient = 
AdaptedReactivePulsarClientFactory.create(pulsarClient);
+                               ReactivePulsarClient reactivePulsarClient = 
AdaptedReactivePulsarClientFactory.create(pulsarClient);
 
-                       ReactiveMessageSender<String> messageSender = 
reactivePulsarClient.messageSender(Schema.STRING)
-                                       
.cache(producerCache).maxInflight(1).topic(topicName).build();
-                       MessageId messageId = 
messageSender.sendOne(MessageSpec.of("Hello world!")).block();
-                       assertThat(messageId).isNotNull();
+                               ReactiveMessageSender<String> messageSender = 
reactivePulsarClient.messageSender(Schema.STRING)
+                                               
.cache(producerCache).maxInflight(1).topic(topicName).build();
+                               MessageId messageId = 
messageSender.sendOne(MessageSpec.of("Hello world!")).block();
+                               assertThat(messageId).isNotNull();
 
-                       Message<String> message = consumer.receive(1, 
TimeUnit.SECONDS);
-                       assertThat(message).isNotNull();
-                       assertThat(message.getValue()).isEqualTo("Hello 
world!");
+                               Message<String> message = consumer.receive(1, 
TimeUnit.SECONDS);
+                               assertThat(message).isNotNull();
+                               assertThat(message.getValue()).isEqualTo("Hello 
world!");
+                       }
                }
        }
 
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
index 7df510f..df525f9 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
@@ -55,7 +55,7 @@ class AdaptedReactiveMessageConsumerBuilder<T> implements 
ReactiveMessageConsume
 
        @Override
        public ReactiveMessageConsumer<T> build() {
-               return new 
AdaptedReactiveMessageConsumer<T>(this.reactiveConsumerAdapterFactory, 
this.schema,
+               return new 
AdaptedReactiveMessageConsumer<>(this.reactiveConsumerAdapterFactory, 
this.schema,
                                toImmutableSpec());
        }
 
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
index 9b04dbc..beef4c4 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
@@ -44,10 +44,10 @@ public class ConcurrentHashMapProducerCacheProvider 
implements ProducerCacheProv
        }
 
        @Override
-       public void close() throws Exception {
+       public void close() {
                for (CompletableFuture<Object> future : this.cache.values()) {
                        future.thenAccept((value) -> {
-                               if (value != null && value instanceof 
AutoCloseable) {
+                               if (value instanceof AutoCloseable) {
                                        try {
                                                ((AutoCloseable) value).close();
                                        }
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
index a0a46b8..c538437 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
@@ -24,15 +24,11 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
 import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 class ProducerCache implements ReactiveMessageSenderCache {
 
-       private static final Logger log = 
LoggerFactory.getLogger(ProducerCache.class);
-
        private final ProducerCacheProvider cacheProvider;
 
        ProducerCache(ProducerCacheProvider cacheProvider) {
@@ -58,7 +54,7 @@ class ProducerCache implements ReactiveMessageSenderCache {
                return Mono.usingWhen(this.leaseCacheEntry(cacheKey, 
producerMono, producerActionTransformer),
                                (producerCacheEntry) -> 
usingProducerAction.apply(producerCacheEntry.getProducer())
                                                
.as(producerCacheEntry::decorateProducerAction),
-                               (producerCacheEntry) -> 
this.returnCacheEntry(producerCacheEntry));
+                               this::returnCacheEntry);
        }
 
        private Mono<Object> returnCacheEntry(ProducerCacheEntry 
producerCacheEntry) {
@@ -77,12 +73,12 @@ class ProducerCache implements ReactiveMessageSenderCache {
                return Flux.usingWhen(this.leaseCacheEntry(cacheKey, 
producerMono, producerActionTransformer),
                                (producerCacheEntry) -> 
usingProducerAction.apply(producerCacheEntry.getProducer())
                                                
.as(producerCacheEntry::decorateProducerAction),
-                               (producerCacheEntry) -> 
this.returnCacheEntry(producerCacheEntry));
+                               this::returnCacheEntry);
        }
 
        @Override
        public void close() throws Exception {
-               if (this.cacheProvider instanceof AutoCloseable) {
+               if (this.cacheProvider != null) {
                        this.cacheProvider.close();
                }
        }
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
index 12f4ced..1f185c1 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
@@ -33,9 +33,9 @@ class ProducerCacheEntry implements AutoCloseable {
 
        private static final Logger log = 
LoggerFactory.getLogger(ProducerCacheEntry.class);
 
-       private final AtomicReference<Producer<?>> producer = new 
AtomicReference();
+       private final AtomicReference<Producer<?>> producer = new 
AtomicReference<>();
 
-       private final AtomicReference<Mono<? extends Producer<?>>> 
producerCreator = new AtomicReference();
+       private final AtomicReference<Mono<? extends Producer<?>>> 
producerCreator = new AtomicReference<>();
 
        private final AtomicInteger activeLeases = new AtomicInteger(0);
 
@@ -69,10 +69,6 @@ class ProducerCacheEntry implements AutoCloseable {
                }
        }
 
-       int getActiveLeases() {
-               return this.activeLeases.get();
-       }
-
        <T> Producer<T> getProducer() {
                return (Producer<T>) this.producer.get();
        }
@@ -92,7 +88,7 @@ class ProducerCacheEntry implements AutoCloseable {
                                        }
                                }
                        }
-                       return Mono.defer(() -> 
this.producerCreator.get()).filter(Producer::isConnected)
+                       return 
Mono.defer(this.producerCreator::get).filter(Producer::isConnected)
                                        .repeatWhenEmpty(5, (flux) -> 
flux.delayElements(Duration.ofSeconds(1))).thenReturn(this);
                });
        }
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
index 4d3686d..5427b72 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
@@ -37,10 +37,6 @@ final class ProducerCacheKey {
                this.schema = schema;
        }
 
-       String getTopicName() {
-               return (this.producerConfigurationData != null) ? 
this.producerConfigurationData.getTopicName() : null;
-       }
-
        @Override
        public boolean equals(Object o) {
                if (this == o) {
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java
index 3dfb87c..747e3b8 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java
@@ -27,7 +27,7 @@ import reactor.core.publisher.Mono;
  * Stateful adapter from CompletableFuture to Mono which keeps a reference to 
the original
  * future so that it can be cancelled. Cancellation is necessary for some 
cases to release
  * resources.
- *
+ * <p>
  * There's additional logic to ignore Pulsar client's
  * {@link 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException} when
  * the Mono has been cancelled. This is to reduce unnecessary exceptions in 
logs.
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java
index 5f45ccd..2b1f1dc 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java
@@ -31,7 +31,7 @@ class ReactiveConsumerAdapterFactory {
        }
 
        <T> ReactiveConsumerAdapter<T> create(Function<PulsarClient, 
ConsumerBuilder<T>> consumerBuilderFactory) {
-               return new 
ReactiveConsumerAdapter<T>(this.pulsarClientSupplier, consumerBuilderFactory);
+               return new ReactiveConsumerAdapter<>(this.pulsarClientSupplier, 
consumerBuilderFactory);
        }
 
 }
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
index d3e8f31..1ecacf8 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
@@ -80,7 +80,7 @@ class ReactiveProducerAdapter<T> {
 
        private <R> Mono<R> usingUncachedProducer(Function<Producer<T>, 
Mono<R>> usingProducerAction) {
                return Mono.usingWhen(createProducerMono(),
-                               (producer) -> Mono.using(() -> 
this.producerActionTransformer.get(),
+                               (producer) -> 
Mono.using(this.producerActionTransformer::get,
                                                (transformer) -> 
usingProducerAction.apply(producer)
                                                                .as((mono) -> 
Mono.from(transformer.transform(mono))),
                                                Disposable::dispose),
@@ -106,7 +106,7 @@ class ReactiveProducerAdapter<T> {
        }
 
        private <R> Flux<R> usingUncachedProducerMany(Function<Producer<T>, 
Flux<R>> usingProducerAction) {
-               return Flux.usingWhen(createProducerMono(), (producer) -> 
Flux.using(() -> this.producerActionTransformer.get(),
+               return Flux.usingWhen(createProducerMono(), (producer) -> 
Flux.using(this.producerActionTransformer::get,
                                (transformer) -> 
usingProducerAction.apply(producer).as(transformer::transform), 
Disposable::dispose),
                                this::closeProducer);
        }
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java
index f3c1f5f..43b7522 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java
@@ -42,14 +42,6 @@ class ReactivePulsarResourceAdapter {
                };
        }
 
-       static ReactivePulsarResourceAdapter create(Supplier<PulsarClient> 
pulsarClientSupplier) {
-               return new ReactivePulsarResourceAdapter(pulsarClientSupplier);
-       }
-
-       static ReactivePulsarResourceAdapter create(PulsarClient pulsarClient) {
-               return create(() -> pulsarClient);
-       }
-
        ReactiveProducerAdapterFactory producer() {
                return new 
ReactiveProducerAdapterFactory(this.pulsarClientSupplier);
        }
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java
index c9dd6d1..1eb3a3a 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java
@@ -31,7 +31,7 @@ class ReactiveReaderAdapterFactory {
        }
 
        <T> ReactiveReaderAdapter<T> create(Function<PulsarClient, 
ReaderBuilder<T>> readerBuilderFactory) {
-               return new ReactiveReaderAdapter<T>(this.pulsarClientSupplier, 
readerBuilderFactory);
+               return new ReactiveReaderAdapter<>(this.pulsarClientSupplier, 
readerBuilderFactory);
        }
 
 }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
index d49fb11..fff4967 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
@@ -127,7 +127,7 @@ public interface MessageSpecBuilder<T> {
         *
         * <p>
         * The timestamp is milliseconds and based on UTC (eg:
-        * {@link System#currentTimeMillis()}.
+        * {@link System#currentTimeMillis()}).
         *
         * <p>
         * <b>Note</b>: messages are only delivered with delay when a consumer 
is consuming
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
index 7e9f960..07b1d22 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
@@ -292,7 +292,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
         * Sets the consumer name.
         *
         * <p>
-        * Consumer name is informative and it can be used to indentify a 
particular consumer
+        * Consumer name is informative and it can be used to identify a 
particular consumer
         * instance from the topic stats.
         * @param consumerName the consumer name
         * @return the consumer builder instance
@@ -341,7 +341,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
 
        /**
         * Sets the priority level for the consumer.
-        *
+        * <p>
         * <b>Shared subscription</b> Sets the priority level for the shared 
subscription
         * consumers to which the broker gives more priority while dispatching 
messages. Here,
         * the broker follows descending priorities. (eg: 0=max-priority, 1, 
2,..)
@@ -430,13 +430,13 @@ public interface ReactiveMessageConsumerBuilder<T> {
         * The timeout needs to be greater than 1 second.
         *
         * <p>
-        * By default, the acknowledge timeout is disabled and that means that 
messages
+        * By default, the acknowledgement timeout is disabled and that means 
that messages
         * delivered to a consumer will not be re-delivered unless the consumer 
crashes.
         *
         * <p>
-        * When enabling the acknowledge timeout, if a message is not 
acknowledged within the
-        * specified timeout it will be re-delivered to the consumer (possibly 
to a different
-        * consumer in case of a shared subscription).
+        * When enabling the acknowledgement timeout, if a message is not 
acknowledged within
+        * the specified timeout it will be re-delivered to the consumer 
(possibly to a
+        * different consumer in case of a shared subscription).
         * @param ackTimeout the timeout for unacknowledged messages.
         * @return the consumer builder instance
         * @see ConsumerBuilder#ackTimeout(long, TimeUnit)
@@ -450,10 +450,10 @@ public interface ReactiveMessageConsumerBuilder<T> {
         * Sets the granularity of the ack-timeout redelivery.
         *
         * <p>
-        * By default, the tick time is set to 1 second. Using an higher tick 
time will reduce
+        * By default, the tick time is set to 1 second. Using a higher tick 
time will reduce
         * the memory overhead to track messages when the ack-timeout is set to 
bigger values
         * (eg: 1hour).
-        * @param ackTimeoutTickTime the minimum precision for the acknowledge 
timeout
+        * @param ackTimeoutTickTime the minimum precision for the 
acknowledgement timeout
         * messages tracker
         * @return the consumer builder instance
         * @see ConsumerBuilder#ackTimeoutTickTime(long, TimeUnit)
@@ -528,7 +528,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
         * Sets a dead letter policy for the consumer.
         *
         * <p>
-        * By default messages are redelivered indefinitely if they are not 
acknowledged. By
+        * By default, messages are redelivered indefinitely if they are not 
acknowledged. By
         * using a dead letter mechanism, messages that have reached the max 
redelivery count
         * will be acknowledged automatically and send to the configured dead 
letter topic.
         *
@@ -539,7 +539,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
         *          
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build())
         *          .build();
         * </pre> Default the dead letter topic name is 
{TopicName}-{Subscription}-DLQ. You
-        * can set o set a custom dead letter topic name like this: <pre>
+        * can set a custom dead letter topic name like this: <pre>
         * client.messageConsumer(Schema.BYTES)
         *          .deadLetterPolicy(DeadLetterPolicy
         *              .builder()
@@ -604,7 +604,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
        }
 
        /**
-        * Sets the maximum total receiver queue size across partitons.
+        * Sets the maximum total receiver queue size across partitions.
         *
         * <p>
         * This setting is used to reduce the receiver queue size for 
individual partitions
@@ -685,8 +685,8 @@ public interface ReactiveMessageConsumerBuilder<T> {
         * </pre> Buffering large number of outstanding uncompleted chunked 
messages can
         * create memory pressure. It can be guarded by providing a
         * {@code maxPendingChunkedMessage} threshold. Once the consumer 
reaches this
-        * threshold, it drops the outstanding unchunked-messages by silently 
acknowledging or
-        * asking the broker to redeliver later by marking it unacknowledged. 
This behavior
+        * threshold, it drops the outstanding non-chunked messages by silently 
acknowledging
+        * or asking the broker to redeliver later by marking it 
unacknowledged. This behavior
         * can be controlled by setting {@link 
#autoAckOldestChunkedMessageOnQueueFull} The
         * default value is 10.
         * @param maxPendingChunkedMessage the maximum pending chunked messages.
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
index 9a9d248..882a0ed 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
@@ -19,8 +19,8 @@ package org.apache.pulsar.reactive.client.api;
 import org.apache.pulsar.client.api.Schema;
 
 /**
- * Apache Pulsar Reactive Client interface
- *
+ * Apache Pulsar Reactive Client interface.
+ * <p>
  * Contains methods to create builders for {@link ReactiveMessageSender},
  * {@link ReactiveMessageReader} and {@link ReactiveMessageConsumer} instances.
  */
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java
index 95285b2..38a3ac3 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java
@@ -53,7 +53,7 @@ public final class ApiImplementationFactory {
         * @return the result of the message processing
         */
        public static <T> MessageResult<T> negativeAcknowledge(MessageId 
messageId, T value) {
-               return new DefaultMessageResult<T>(messageId, false, value);
+               return new DefaultMessageResult<>(messageId, false, value);
        }
 
        /**
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
index 7910116..eb19e99 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
@@ -139,7 +139,7 @@ class DefaultMessageSpecBuilder<T> implements 
MessageSpecBuilder<T> {
 
        @Override
        public MessageSpec<T> build() {
-               return new DefaultMessageSpec<T>(this.key, this.orderingKey, 
this.keyBytes, this.value, this.properties,
+               return new DefaultMessageSpec<>(this.key, this.orderingKey, 
this.keyBytes, this.value, this.properties,
                                this.eventTime, this.sequenceId, 
this.replicationClusters, this.disableReplication, this.deliverAt,
                                this.deliverAfterDelay, this.deliverAfterUnit);
        }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java
index 5e30171..638b116 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java
@@ -29,7 +29,7 @@ import reactor.util.concurrent.Queues;
 /**
  * Functions for implementing In-order parallel processing for Pulsar messages 
using
  * Project Reactor.
- *
+ * <p>
  * A processing group is resolved for each message based on the message's key. 
The message
  * flux is split into group fluxes based on the processing group. Each group 
flux is
  * processes messages in order (one-by-one). Multiple group fluxes are 
processed in
diff --git 
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
 
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
index 206a6f3..b7e35d7 100644
--- 
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
+++ 
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
 import org.junit.jupiter.api.Test;
@@ -101,7 +100,7 @@ class MessageSpecTest {
                private Duration deliverAfter;
 
                @Override
-               public MessageId send() throws PulsarClientException {
+               public MessageId send() {
                        throw new IllegalStateException("not implemented");
                }
 

Reply via email to