This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new e2e06c0 Some cleanups (#79)
e2e06c0 is described below
commit e2e06c049653a975132e65a33b771c10621d5d0f
Author: Christophe Bornet <[email protected]>
AuthorDate: Fri Dec 2 16:38:41 2022 +0100
Some cleanups (#79)
---
.../adapter/ReactiveMessagePipelineE2ETest.java | 6 ++--
.../adapter/ReactiveMessageSenderE2ETest.java | 42 +++++++++++-----------
.../AdaptedReactiveMessageConsumerBuilder.java | 2 +-
.../ConcurrentHashMapProducerCacheProvider.java | 4 +--
.../client/internal/adapter/ProducerCache.java | 10 ++----
.../internal/adapter/ProducerCacheEntry.java | 10 ++----
.../client/internal/adapter/ProducerCacheKey.java | 4 ---
.../internal/adapter/PulsarFutureAdapter.java | 2 +-
.../adapter/ReactiveConsumerAdapterFactory.java | 2 +-
.../internal/adapter/ReactiveProducerAdapter.java | 4 +--
.../adapter/ReactivePulsarResourceAdapter.java | 8 -----
.../adapter/ReactiveReaderAdapterFactory.java | 2 +-
.../reactive/client/api/MessageSpecBuilder.java | 2 +-
.../client/api/ReactiveMessageConsumerBuilder.java | 26 +++++++-------
.../reactive/client/api/ReactivePulsarClient.java | 4 +--
.../internal/api/ApiImplementationFactory.java | 2 +-
.../internal/api/DefaultMessageSpecBuilder.java | 2 +-
.../api/GroupOrderedMessageProcessors.java | 2 +-
.../reactive/client/api/MessageSpecTest.java | 3 +-
19 files changed, 59 insertions(+), 78 deletions(-)
diff --git
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
index 25dd1f1..3914961 100644
---
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
+++
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
@@ -73,13 +73,13 @@ class ReactiveMessagePipelineE2ETest {
List<String> messages =
Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(100);
- try (ReactiveMessagePipeline reactiveMessagePipeline =
reactivePulsarClient.messageConsumer(Schema.STRING)
+ try (ReactiveMessagePipeline ignored =
reactivePulsarClient.messageConsumer(Schema.STRING)
.subscriptionName("sub").topic(topicName).build().messagePipeline()
.messageHandler((message) ->
Mono.fromRunnable(() -> {
messages.add(message.getValue());
latch.countDown();
})).build().start()) {
- latch.await(5, TimeUnit.SECONDS);
+ assertThat(latch.await(5,
TimeUnit.SECONDS)).isTrue();
assertThat(messages).isEqualTo(Flux.range(1,
100).map(Object::toString).collectList().block());
}
}
@@ -133,7 +133,7 @@ class ReactiveMessagePipelineE2ETest {
if (messageOrderScenario !=
MessageOrderScenario.NO_PARALLEL) {
reactiveMessageHandlerBuilder.concurrency(KEYS_COUNT).useKeyOrderedProcessing();
}
- try (ReactiveMessagePipeline reactiveMessagePipeline =
reactiveMessageHandlerBuilder.build().start()) {
+ try (ReactiveMessagePipeline ignored =
reactiveMessageHandlerBuilder.build().start()) {
boolean latchCompleted = latch.await(5,
TimeUnit.SECONDS);
assertThat(latchCompleted).as("processing of
all messages should have completed").isTrue();
for (int i = 1; i <= KEYS_COUNT; i++) {
diff --git
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
index 9a43150..bc40351 100644
---
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
+++
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
@@ -56,19 +56,20 @@ class ReactiveMessageSenderE2ETest {
void shouldSendMessageToTopic() throws PulsarClientException {
try (PulsarClient pulsarClient =
SingletonPulsarContainer.createPulsarClient()) {
String topicName = "test" + UUID.randomUUID();
- Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub")
- .subscribe();
+ try (Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+ .subscriptionName("sub").subscribe()) {
- ReactivePulsarClient reactivePulsarClient =
AdaptedReactivePulsarClientFactory.create(pulsarClient);
+ ReactivePulsarClient reactivePulsarClient =
AdaptedReactivePulsarClientFactory.create(pulsarClient);
- ReactiveMessageSender<String> messageSender =
reactivePulsarClient.messageSender(Schema.STRING)
-
.topic(topicName).maxInflight(1).build();
- MessageId messageId =
messageSender.sendOne(MessageSpec.of("Hello world!")).block();
- assertThat(messageId).isNotNull();
+ ReactiveMessageSender<String> messageSender =
reactivePulsarClient.messageSender(Schema.STRING)
+
.topic(topicName).maxInflight(1).build();
+ MessageId messageId =
messageSender.sendOne(MessageSpec.of("Hello world!")).block();
+ assertThat(messageId).isNotNull();
- Message<String> message = consumer.receive(1,
TimeUnit.SECONDS);
- assertThat(message).isNotNull();
- assertThat(message.getValue()).isEqualTo("Hello
world!");
+ Message<String> message = consumer.receive(1,
TimeUnit.SECONDS);
+ assertThat(message).isNotNull();
+ assertThat(message.getValue()).isEqualTo("Hello
world!");
+ }
}
}
@@ -79,19 +80,20 @@ class ReactiveMessageSenderE2ETest {
try (PulsarClient pulsarClient =
SingletonPulsarContainer.createPulsarClient();
ReactiveMessageSenderCache producerCache =
cacheInstance) {
String topicName = "test" + UUID.randomUUID();
- Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub")
- .subscribe();
+ try (Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+ .subscriptionName("sub").subscribe()) {
- ReactivePulsarClient reactivePulsarClient =
AdaptedReactivePulsarClientFactory.create(pulsarClient);
+ ReactivePulsarClient reactivePulsarClient =
AdaptedReactivePulsarClientFactory.create(pulsarClient);
- ReactiveMessageSender<String> messageSender =
reactivePulsarClient.messageSender(Schema.STRING)
-
.cache(producerCache).maxInflight(1).topic(topicName).build();
- MessageId messageId =
messageSender.sendOne(MessageSpec.of("Hello world!")).block();
- assertThat(messageId).isNotNull();
+ ReactiveMessageSender<String> messageSender =
reactivePulsarClient.messageSender(Schema.STRING)
+
.cache(producerCache).maxInflight(1).topic(topicName).build();
+ MessageId messageId =
messageSender.sendOne(MessageSpec.of("Hello world!")).block();
+ assertThat(messageId).isNotNull();
- Message<String> message = consumer.receive(1,
TimeUnit.SECONDS);
- assertThat(message).isNotNull();
- assertThat(message.getValue()).isEqualTo("Hello
world!");
+ Message<String> message = consumer.receive(1,
TimeUnit.SECONDS);
+ assertThat(message).isNotNull();
+ assertThat(message.getValue()).isEqualTo("Hello
world!");
+ }
}
}
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
index 7df510f..df525f9 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
@@ -55,7 +55,7 @@ class AdaptedReactiveMessageConsumerBuilder<T> implements
ReactiveMessageConsume
@Override
public ReactiveMessageConsumer<T> build() {
- return new
AdaptedReactiveMessageConsumer<T>(this.reactiveConsumerAdapterFactory,
this.schema,
+ return new
AdaptedReactiveMessageConsumer<>(this.reactiveConsumerAdapterFactory,
this.schema,
toImmutableSpec());
}
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
index 9b04dbc..beef4c4 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
@@ -44,10 +44,10 @@ public class ConcurrentHashMapProducerCacheProvider
implements ProducerCacheProv
}
@Override
- public void close() throws Exception {
+ public void close() {
for (CompletableFuture<Object> future : this.cache.values()) {
future.thenAccept((value) -> {
- if (value != null && value instanceof
AutoCloseable) {
+ if (value instanceof AutoCloseable) {
try {
((AutoCloseable) value).close();
}
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
index a0a46b8..c538437 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
@@ -24,15 +24,11 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
class ProducerCache implements ReactiveMessageSenderCache {
- private static final Logger log =
LoggerFactory.getLogger(ProducerCache.class);
-
private final ProducerCacheProvider cacheProvider;
ProducerCache(ProducerCacheProvider cacheProvider) {
@@ -58,7 +54,7 @@ class ProducerCache implements ReactiveMessageSenderCache {
return Mono.usingWhen(this.leaseCacheEntry(cacheKey,
producerMono, producerActionTransformer),
(producerCacheEntry) ->
usingProducerAction.apply(producerCacheEntry.getProducer())
.as(producerCacheEntry::decorateProducerAction),
- (producerCacheEntry) ->
this.returnCacheEntry(producerCacheEntry));
+ this::returnCacheEntry);
}
private Mono<Object> returnCacheEntry(ProducerCacheEntry
producerCacheEntry) {
@@ -77,12 +73,12 @@ class ProducerCache implements ReactiveMessageSenderCache {
return Flux.usingWhen(this.leaseCacheEntry(cacheKey,
producerMono, producerActionTransformer),
(producerCacheEntry) ->
usingProducerAction.apply(producerCacheEntry.getProducer())
.as(producerCacheEntry::decorateProducerAction),
- (producerCacheEntry) ->
this.returnCacheEntry(producerCacheEntry));
+ this::returnCacheEntry);
}
@Override
public void close() throws Exception {
- if (this.cacheProvider instanceof AutoCloseable) {
+ if (this.cacheProvider != null) {
this.cacheProvider.close();
}
}
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
index 12f4ced..1f185c1 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
@@ -33,9 +33,9 @@ class ProducerCacheEntry implements AutoCloseable {
private static final Logger log =
LoggerFactory.getLogger(ProducerCacheEntry.class);
- private final AtomicReference<Producer<?>> producer = new
AtomicReference();
+ private final AtomicReference<Producer<?>> producer = new
AtomicReference<>();
- private final AtomicReference<Mono<? extends Producer<?>>>
producerCreator = new AtomicReference();
+ private final AtomicReference<Mono<? extends Producer<?>>>
producerCreator = new AtomicReference<>();
private final AtomicInteger activeLeases = new AtomicInteger(0);
@@ -69,10 +69,6 @@ class ProducerCacheEntry implements AutoCloseable {
}
}
- int getActiveLeases() {
- return this.activeLeases.get();
- }
-
<T> Producer<T> getProducer() {
return (Producer<T>) this.producer.get();
}
@@ -92,7 +88,7 @@ class ProducerCacheEntry implements AutoCloseable {
}
}
}
- return Mono.defer(() ->
this.producerCreator.get()).filter(Producer::isConnected)
+ return
Mono.defer(this.producerCreator::get).filter(Producer::isConnected)
.repeatWhenEmpty(5, (flux) ->
flux.delayElements(Duration.ofSeconds(1))).thenReturn(this);
});
}
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
index 4d3686d..5427b72 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
@@ -37,10 +37,6 @@ final class ProducerCacheKey {
this.schema = schema;
}
- String getTopicName() {
- return (this.producerConfigurationData != null) ?
this.producerConfigurationData.getTopicName() : null;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java
index 3dfb87c..747e3b8 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java
@@ -27,7 +27,7 @@ import reactor.core.publisher.Mono;
* Stateful adapter from CompletableFuture to Mono which keeps a reference to
the original
* future so that it can be cancelled. Cancellation is necessary for some
cases to release
* resources.
- *
+ * <p>
* There's additional logic to ignore Pulsar client's
* {@link
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException} when
* the Mono has been cancelled. This is to reduce unnecessary exceptions in
logs.
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java
index 5f45ccd..2b1f1dc 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java
@@ -31,7 +31,7 @@ class ReactiveConsumerAdapterFactory {
}
<T> ReactiveConsumerAdapter<T> create(Function<PulsarClient,
ConsumerBuilder<T>> consumerBuilderFactory) {
- return new
ReactiveConsumerAdapter<T>(this.pulsarClientSupplier, consumerBuilderFactory);
+ return new ReactiveConsumerAdapter<>(this.pulsarClientSupplier,
consumerBuilderFactory);
}
}
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
index d3e8f31..1ecacf8 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
@@ -80,7 +80,7 @@ class ReactiveProducerAdapter<T> {
private <R> Mono<R> usingUncachedProducer(Function<Producer<T>,
Mono<R>> usingProducerAction) {
return Mono.usingWhen(createProducerMono(),
- (producer) -> Mono.using(() ->
this.producerActionTransformer.get(),
+ (producer) ->
Mono.using(this.producerActionTransformer::get,
(transformer) ->
usingProducerAction.apply(producer)
.as((mono) ->
Mono.from(transformer.transform(mono))),
Disposable::dispose),
@@ -106,7 +106,7 @@ class ReactiveProducerAdapter<T> {
}
private <R> Flux<R> usingUncachedProducerMany(Function<Producer<T>,
Flux<R>> usingProducerAction) {
- return Flux.usingWhen(createProducerMono(), (producer) ->
Flux.using(() -> this.producerActionTransformer.get(),
+ return Flux.usingWhen(createProducerMono(), (producer) ->
Flux.using(this.producerActionTransformer::get,
(transformer) ->
usingProducerAction.apply(producer).as(transformer::transform),
Disposable::dispose),
this::closeProducer);
}
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java
index f3c1f5f..43b7522 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java
@@ -42,14 +42,6 @@ class ReactivePulsarResourceAdapter {
};
}
- static ReactivePulsarResourceAdapter create(Supplier<PulsarClient>
pulsarClientSupplier) {
- return new ReactivePulsarResourceAdapter(pulsarClientSupplier);
- }
-
- static ReactivePulsarResourceAdapter create(PulsarClient pulsarClient) {
- return create(() -> pulsarClient);
- }
-
ReactiveProducerAdapterFactory producer() {
return new
ReactiveProducerAdapterFactory(this.pulsarClientSupplier);
}
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java
index c9dd6d1..1eb3a3a 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java
@@ -31,7 +31,7 @@ class ReactiveReaderAdapterFactory {
}
<T> ReactiveReaderAdapter<T> create(Function<PulsarClient,
ReaderBuilder<T>> readerBuilderFactory) {
- return new ReactiveReaderAdapter<T>(this.pulsarClientSupplier,
readerBuilderFactory);
+ return new ReactiveReaderAdapter<>(this.pulsarClientSupplier,
readerBuilderFactory);
}
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
index d49fb11..fff4967 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
@@ -127,7 +127,7 @@ public interface MessageSpecBuilder<T> {
*
* <p>
* The timestamp is milliseconds and based on UTC (eg:
- * {@link System#currentTimeMillis()}.
+ * {@link System#currentTimeMillis()}).
*
* <p>
* <b>Note</b>: messages are only delivered with delay when a consumer
is consuming
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
index 7e9f960..07b1d22 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
@@ -292,7 +292,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
* Sets the consumer name.
*
* <p>
- * Consumer name is informative and it can be used to indentify a
particular consumer
+ * Consumer name is informative and it can be used to identify a
particular consumer
* instance from the topic stats.
* @param consumerName the consumer name
* @return the consumer builder instance
@@ -341,7 +341,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
/**
* Sets the priority level for the consumer.
- *
+ * <p>
* <b>Shared subscription</b> Sets the priority level for the shared
subscription
* consumers to which the broker gives more priority while dispatching
messages. Here,
* the broker follows descending priorities. (eg: 0=max-priority, 1,
2,..)
@@ -430,13 +430,13 @@ public interface ReactiveMessageConsumerBuilder<T> {
* The timeout needs to be greater than 1 second.
*
* <p>
- * By default, the acknowledge timeout is disabled and that means that
messages
+ * By default, the acknowledgement timeout is disabled and that means
that messages
* delivered to a consumer will not be re-delivered unless the consumer
crashes.
*
* <p>
- * When enabling the acknowledge timeout, if a message is not
acknowledged within the
- * specified timeout it will be re-delivered to the consumer (possibly
to a different
- * consumer in case of a shared subscription).
+ * When enabling the acknowledgement timeout, if a message is not
acknowledged within
+ * the specified timeout it will be re-delivered to the consumer
(possibly to a
+ * different consumer in case of a shared subscription).
* @param ackTimeout the timeout for unacknowledged messages.
* @return the consumer builder instance
* @see ConsumerBuilder#ackTimeout(long, TimeUnit)
@@ -450,10 +450,10 @@ public interface ReactiveMessageConsumerBuilder<T> {
* Sets the granularity of the ack-timeout redelivery.
*
* <p>
- * By default, the tick time is set to 1 second. Using an higher tick
time will reduce
+ * By default, the tick time is set to 1 second. Using a higher tick
time will reduce
* the memory overhead to track messages when the ack-timeout is set to
bigger values
* (eg: 1hour).
- * @param ackTimeoutTickTime the minimum precision for the acknowledge
timeout
+ * @param ackTimeoutTickTime the minimum precision for the
acknowledgement timeout
* messages tracker
* @return the consumer builder instance
* @see ConsumerBuilder#ackTimeoutTickTime(long, TimeUnit)
@@ -528,7 +528,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
* Sets a dead letter policy for the consumer.
*
* <p>
- * By default messages are redelivered indefinitely if they are not
acknowledged. By
+ * By default, messages are redelivered indefinitely if they are not
acknowledged. By
* using a dead letter mechanism, messages that have reached the max
redelivery count
* will be acknowledged automatically and send to the configured dead
letter topic.
*
@@ -539,7 +539,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
*
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build())
* .build();
* </pre> Default the dead letter topic name is
{TopicName}-{Subscription}-DLQ. You
- * can set o set a custom dead letter topic name like this: <pre>
+ * can set a custom dead letter topic name like this: <pre>
* client.messageConsumer(Schema.BYTES)
* .deadLetterPolicy(DeadLetterPolicy
* .builder()
@@ -604,7 +604,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
}
/**
- * Sets the maximum total receiver queue size across partitons.
+ * Sets the maximum total receiver queue size across partitions.
*
* <p>
* This setting is used to reduce the receiver queue size for
individual partitions
@@ -685,8 +685,8 @@ public interface ReactiveMessageConsumerBuilder<T> {
* </pre> Buffering large number of outstanding uncompleted chunked
messages can
* create memory pressure. It can be guarded by providing a
* {@code maxPendingChunkedMessage} threshold. Once the consumer
reaches this
- * threshold, it drops the outstanding unchunked-messages by silently
acknowledging or
- * asking the broker to redeliver later by marking it unacknowledged.
This behavior
+ * threshold, it drops the outstanding non-chunked messages by silently
acknowledging
+ * or asking the broker to redeliver later by marking it
unacknowledged. This behavior
* can be controlled by setting {@link
#autoAckOldestChunkedMessageOnQueueFull} The
* default value is 10.
* @param maxPendingChunkedMessage the maximum pending chunked messages.
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
index 9a9d248..882a0ed 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
@@ -19,8 +19,8 @@ package org.apache.pulsar.reactive.client.api;
import org.apache.pulsar.client.api.Schema;
/**
- * Apache Pulsar Reactive Client interface
- *
+ * Apache Pulsar Reactive Client interface.
+ * <p>
* Contains methods to create builders for {@link ReactiveMessageSender},
* {@link ReactiveMessageReader} and {@link ReactiveMessageConsumer} instances.
*/
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java
index 95285b2..38a3ac3 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java
@@ -53,7 +53,7 @@ public final class ApiImplementationFactory {
* @return the result of the message processing
*/
public static <T> MessageResult<T> negativeAcknowledge(MessageId
messageId, T value) {
- return new DefaultMessageResult<T>(messageId, false, value);
+ return new DefaultMessageResult<>(messageId, false, value);
}
/**
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
index 7910116..eb19e99 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
@@ -139,7 +139,7 @@ class DefaultMessageSpecBuilder<T> implements
MessageSpecBuilder<T> {
@Override
public MessageSpec<T> build() {
- return new DefaultMessageSpec<T>(this.key, this.orderingKey,
this.keyBytes, this.value, this.properties,
+ return new DefaultMessageSpec<>(this.key, this.orderingKey,
this.keyBytes, this.value, this.properties,
this.eventTime, this.sequenceId,
this.replicationClusters, this.disableReplication, this.deliverAt,
this.deliverAfterDelay, this.deliverAfterUnit);
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java
index 5e30171..638b116 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java
@@ -29,7 +29,7 @@ import reactor.util.concurrent.Queues;
/**
* Functions for implementing In-order parallel processing for Pulsar messages
using
* Project Reactor.
- *
+ * <p>
* A processing group is resolved for each message based on the message's key.
The message
* flux is split into group fluxes based on the processing group. Each group
flux is
* processes messages in order (one-by-one). Multiple group fluxes are
processed in
diff --git
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
index 206a6f3..b7e35d7 100644
---
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
+++
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
import org.junit.jupiter.api.Test;
@@ -101,7 +100,7 @@ class MessageSpecTest {
private Duration deliverAfter;
@Override
- public MessageId send() throws PulsarClientException {
+ public MessageId send() {
throw new IllegalStateException("not implemented");
}