This is an automated email from the ASF dual-hosted git repository.
cbornet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 3070bf2 Add correlation metadata to the message sent (#115)
3070bf2 is described below
commit 3070bf289f0673d0d30f8429d626ec381eda7ccd
Author: Christophe Bornet <[email protected]>
AuthorDate: Thu Jan 26 18:02:56 2023 +0100
Add correlation metadata to the message sent (#115)
* Add sendManyCorrelated to ReactiveMessageSender
Fixes #22
* Add correlation support for errors
* Test correlating errors
* sendOne shouldn't use correlation
* Add userContext to MessageSpec
* Update
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSendingException.java
* Fix test
Co-authored-by: Lari Hotari <[email protected]>
Co-authored-by: Lari Hotari <[email protected]>
---
.../adapter/AdaptedReactiveMessageSender.java | 14 ++-
.../adapter/AdaptedReactiveMessageSenderTest.java | 138 ++++++++++++++++++++-
.../reactive/client/api/MessageSendResult.java | 90 ++++++++++++++
.../pulsar/reactive/client/api/MessageSpec.java | 9 ++
.../reactive/client/api/MessageSpecBuilder.java | 8 ++
.../reactive/client/api/ReactiveMessageSender.java | 17 ++-
.../api/ReactiveMessageSendingException.java | 72 +++++++++++
.../client/internal/api/DefaultMessageSpec.java | 52 +++++++-
.../internal/api/DefaultMessageSpecBuilder.java | 10 +-
.../client/internal/api/ValueOnlyMessageSpec.java | 5 +
.../reactive/client/api/MessageSpecTest.java | 5 +-
11 files changed, 406 insertions(+), 14 deletions(-)
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
index c3a8b26..287871b 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
@@ -29,9 +29,11 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.reactive.client.api.MessageSendResult;
import org.apache.pulsar.reactive.client.api.MessageSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSendingException;
import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
import org.reactivestreams.Publisher;
@@ -190,6 +192,12 @@ class AdaptedReactiveMessageSender<T> implements
ReactiveMessageSender<T> {
.usingProducer((producer, transformer) ->
createMessageMono(messageSpec, producer, transformer));
}
+ private Mono<MessageId> createMessageMonoWrapped(MessageSpec<T>
messageSpec, Producer<T> producer,
+ PublisherTransformer transformer) {
+ return createMessageMono(messageSpec, producer, transformer)
+ .onErrorResume((throwable) -> Mono.error(new
ReactiveMessageSendingException(throwable, messageSpec)));
+ }
+
private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec,
Producer<T> producer,
PublisherTransformer transformer) {
return PulsarFutureAdapter.adaptPulsarFuture(() -> {
@@ -200,10 +208,12 @@ class AdaptedReactiveMessageSender<T> implements
ReactiveMessageSender<T> {
}
@Override
- public Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs)
{
+ public Flux<MessageSendResult<T>> sendMany(Publisher<MessageSpec<T>>
messageSpecs) {
return createReactiveProducerAdapter()
.usingProducerMany((producer, transformer) ->
Flux.from(messageSpecs).flatMapSequential(
- (messageSpec) ->
createMessageMono(messageSpec, producer, transformer), this.maxConcurrency));
+ (messageSpec) ->
createMessageMonoWrapped(messageSpec, producer, transformer)
+
.map((messageId) -> new MessageSendResult<>(messageId, messageSpec)),
+ this.maxConcurrency));
}
}
diff --git
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
index 400b432..65eff88 100644
---
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
+++
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
@@ -26,9 +26,11 @@ import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -45,6 +47,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
+import
org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerBase;
@@ -53,10 +56,12 @@ import
org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.internal.DefaultImplementation;
import
org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
+import org.apache.pulsar.reactive.client.api.MessageSendResult;
import org.apache.pulsar.reactive.client.api.MessageSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSendingException;
import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -162,6 +167,37 @@ class AdaptedReactiveMessageSenderTest {
assertThat(messageId1).isEqualTo(MessageId.earliest);
}
+ @Test
+ void sendOneErrorDoesntUseCorrelatedMessageSendingException() throws
Exception {
+ PulsarClientImpl pulsarClient = spy(
+ (PulsarClientImpl)
PulsarClient.builder().serviceUrl("http://dummy").build());
+
+ ProducerBase<String> producer = mock(ProducerBase.class);
+
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+
+ given(producer.newMessage()).willAnswer((__) -> {
+ TypedMessageBuilderImpl<String> typedMessageBuilder =
spy(
+ new TypedMessageBuilderImpl<>(producer,
Schema.STRING));
+ given(typedMessageBuilder.sendAsync()).willAnswer((___)
-> {
+ CompletableFuture<MessageId> failed = new
CompletableFuture<>();
+ failed.completeExceptionally(new
ProducerQueueIsFullError("Queue is full"));
+ return failed;
+ });
+ return typedMessageBuilder;
+ });
+
+
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+ eq(Schema.STRING), isNull());
+
+ ReactiveMessageSender<String> reactiveSender =
AdaptedReactivePulsarClientFactory.create(pulsarClient)
+
.messageSender(Schema.STRING).topic("my-topic").build();
+
+
StepVerifier.create(reactiveSender.sendOne(MessageSpec.of("test1")))
+ // the original exception should be returned
without wrapping it in
+ // ReactiveMessageSendingException
+
.expectError(ProducerQueueIsFullError.class).verify();
+ }
+
@Test
void sendMany() throws Exception {
PulsarClientImpl pulsarClient = spy(
@@ -184,8 +220,8 @@ class AdaptedReactiveMessageSenderTest {
.messageSender(Schema.STRING).topic("my-topic").build();
Flux<MessageSpec<String>> messageSpecs =
Flux.just(MessageSpec.of("test1"), MessageSpec.of("test2"));
-
StepVerifier.create(reactiveSender.sendMany(messageSpecs)).expectNext(MessageId.earliest)
- .expectNext(MessageId.latest).verifyComplete();
+
StepVerifier.create(reactiveSender.sendMany(messageSpecs).map(MessageSendResult::getMessageId))
+
.expectNext(MessageId.earliest).expectNext(MessageId.latest).verifyComplete();
verify(pulsarClient).createProducerAsync(any(), any(),
isNull());
InOrder inOrder = Mockito.inOrder(typedMessageBuilder1,
typedMessageBuilder2);
@@ -195,6 +231,98 @@ class AdaptedReactiveMessageSenderTest {
inOrder.verify(typedMessageBuilder2).sendAsync();
}
+ @Test
+ void sendManyErrorShowsInputInMessage() throws Exception {
+ PulsarClientImpl pulsarClient = spy(
+ (PulsarClientImpl)
PulsarClient.builder().serviceUrl("http://dummy").build());
+
+ ProducerBase<String> producer = mock(ProducerBase.class);
+
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+
+ AtomicInteger entryId = new AtomicInteger();
+ List<MessageId> messageIds = new CopyOnWriteArrayList<>();
+ given(producer.newMessage()).willAnswer((__) -> {
+ TypedMessageBuilderImpl<String> typedMessageBuilder =
spy(
+ new TypedMessageBuilderImpl<>(producer,
Schema.STRING));
+ given(typedMessageBuilder.sendAsync()).willAnswer((___)
-> {
+ if (entryId.get() == 1) {
+ CompletableFuture<MessageId> failed =
new CompletableFuture<>();
+ failed.completeExceptionally(new
ProducerQueueIsFullError("Queue is full"));
+ return failed;
+ }
+ MessageId messageId =
DefaultImplementation.getDefaultImplementation().newMessageId(1,
+ entryId.incrementAndGet(), 1);
+ messageIds.add(messageId);
+ return
CompletableFuture.completedFuture(messageId);
+ });
+ return typedMessageBuilder;
+ });
+
+
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+ eq(Schema.STRING), isNull());
+
+ ReactiveMessageSender<String> reactiveSender =
AdaptedReactivePulsarClientFactory.create(pulsarClient)
+
.messageSender(Schema.STRING).topic("my-topic").build();
+
+ Flux<MessageSpec<String>> messageSpecs =
Flux.just(MessageSpec.of("test1"),
+
MessageSpec.builder("test2").correlationMetadata("my-context").build());
+ StepVerifier.create(reactiveSender.sendMany(messageSpecs))
+ .assertNext((next) ->
assertThat(next.getMessageId()).isEqualTo(messageIds.get(0)))
+ .expectErrorSatisfies((throwable) ->
assertThat(throwable)
+
.isInstanceOf(ReactiveMessageSendingException.class)
+
.extracting(ReactiveMessageSendingException.class::cast)
+ .satisfies((cme) ->
assertThat(cme.toString()).contains("correlation metadata={my-context}")))
+ .verify();
+ }
+
+ @Test
+ void sendManyCorrelated() throws Exception {
+ PulsarClientImpl pulsarClient = spy(
+ (PulsarClientImpl)
PulsarClient.builder().serviceUrl("http://dummy").build());
+
+ ProducerBase<String> producer = mock(ProducerBase.class);
+
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+
+ AtomicInteger entryId = new AtomicInteger();
+ List<MessageId> messageIds = new CopyOnWriteArrayList<>();
+ given(producer.newMessage()).willAnswer((__) -> {
+ TypedMessageBuilderImpl<String> typedMessageBuilder =
spy(
+ new TypedMessageBuilderImpl<>(producer,
Schema.STRING));
+ given(typedMessageBuilder.sendAsync()).willAnswer((___)
-> {
+ if (entryId.get() == 2) {
+ CompletableFuture<MessageId> failed =
new CompletableFuture<>();
+ failed.completeExceptionally(new
ProducerQueueIsFullError("Queue is full"));
+ return failed;
+ }
+ MessageId messageId =
DefaultImplementation.getDefaultImplementation().newMessageId(1,
+ entryId.incrementAndGet(), 1);
+ messageIds.add(messageId);
+ return
CompletableFuture.completedFuture(messageId);
+ });
+ return typedMessageBuilder;
+ });
+
+
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+ eq(Schema.STRING), isNull());
+
+ ReactiveMessageSender<String> reactiveSender =
AdaptedReactivePulsarClientFactory.create(pulsarClient)
+
.messageSender(Schema.STRING).topic("my-topic").build();
+
+ MessageSpec<String> messageSpec1 = MessageSpec.of("test1");
+ Flux<MessageSpec<String>> keysAndMessageSpecs =
Flux.just(messageSpec1,
+
MessageSpec.builder("test2").correlationMetadata(456).build(),
+
MessageSpec.builder("test3").correlationMetadata(789).build());
+
StepVerifier.create(reactiveSender.sendMany(keysAndMessageSpecs)).assertNext((next)
-> {
+
assertThat(next.getMessageId()).isEqualTo(messageIds.get(0));
+
assertThat(next.getMessageSpec()).isEqualTo(messageSpec1);
+ }).assertNext((next) -> {
+
assertThat(next.getMessageId()).isEqualTo(messageIds.get(1));
+ assertThat((int)
next.getCorrelationMetadata()).isEqualTo(456);
+ }).expectErrorSatisfies((throwable) ->
assertThat(throwable).isInstanceOf(ReactiveMessageSendingException.class)
+
.extracting(ReactiveMessageSendingException.class::cast)
+ .satisfies((cme) -> assertThat((int)
cme.getCorrelationMetadata()).isEqualTo(789))).verify();
+ }
+
@ParameterizedTest
@MethodSource
void senderCache(String name, ReactiveMessageSenderCache cache) throws
Exception {
@@ -344,8 +472,10 @@ class AdaptedReactiveMessageSenderTest {
@ParameterizedTest
@CsvSource({ "7,100", "13,100", "37,500", "51,1000" })
void maxInFlightUsingSendMany(int maxInflight, int maxElements) throws
Exception {
- doTestMaxInFlight((reactiveSender, inputFlux) ->
inputFlux.window(3).flatMap(
- (subFlux) -> subFlux.map((i) ->
MessageSpec.of(String.valueOf(i))).as(reactiveSender::sendMany), 100),
+ doTestMaxInFlight(
+ (reactiveSender, inputFlux) ->
inputFlux.window(3)
+ .flatMap((subFlux) ->
subFlux.map((i) -> MessageSpec.of(String.valueOf(i)))
+
.as(reactiveSender::sendMany).map(MessageSendResult::getMessageId), 100),
maxInflight, maxElements);
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSendResult.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSendResult.java
new file mode 100644
index 0000000..871b745
--- /dev/null
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSendResult.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.reactive.client.api;
+
+import java.util.Objects;
+
+import org.apache.pulsar.client.api.MessageId;
+
+/**
+ * Result of a message sending. Holds the spec of the message sent and the
assigned
+ * message ID.
+ *
+ * @param <T> the type of the message
+ */
+public class MessageSendResult<T> {
+
+ private final MessageId messageId;
+
+ private final MessageSpec<T> messageSpec;
+
+ /**
+ * Creates a new instance.
+ * @param messageId the ID assigned to the message
+ * @param messageSpec the message spec that was sent
+ */
+ public MessageSendResult(MessageId messageId, MessageSpec<T>
messageSpec) {
+ this.messageId = messageId;
+ this.messageSpec = messageSpec;
+ }
+
+ /**
+ * Gets the ID assigned to the message.
+ * @return the ID assigned to the message.
+ */
+ public MessageId getMessageId() {
+ return this.messageId;
+ }
+
+ /**
+ * Gets the message spec that was sent.
+ * @return the message spec that was sent
+ */
+ public MessageSpec<T> getMessageSpec() {
+ return this.messageSpec;
+ }
+
+ /**
+ * Gets the correlation metadata of the message spec that was sent.
+ * @param <C> the correlation metadata
+ * @return the correlation metadata
+ */
+ public <C> C getCorrelationMetadata() {
+ return this.messageSpec.getCorrelationMetadata();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MessageSendResult<?> that = (MessageSendResult<?>) o;
+ return Objects.equals(this.messageId, that.messageId) &&
Objects.equals(this.messageSpec, that.messageSpec);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.messageId, this.messageSpec);
+ }
+
+}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpec.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpec.java
index c2377a0..d8ac241 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpec.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpec.java
@@ -48,4 +48,13 @@ public interface MessageSpec<T> {
return
ApiImplementationFactory.createValueOnlyMessageSpec(value);
}
+ /**
+ * Gets the correlation metadata of this message spec.
+ * @param <C> the correlation metadata type
+ * @return the correlation metadata
+ */
+ default <C> C getCorrelationMetadata() {
+ return null;
+ }
+
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
index ba2e2f8..7d8fd2a 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
@@ -155,6 +155,14 @@ public interface MessageSpecBuilder<T> {
*/
MessageSpecBuilder<T> deliverAfter(long delay, TimeUnit unit);
+ /**
+ * Attach a correlation metadata to the message spec to be able to
correlate a sending
+ * operation and the sending operation result.
+ * @param correlationMetadata the correlation metadata to attach
+ * @return the message builder instance
+ */
+ MessageSpecBuilder<T> correlationMetadata(Object correlationMetadata);
+
/**
* Builds the {@link MessageSpec}.
* @return the built message spec
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
index 8630c85..69dc31a 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
@@ -39,12 +39,19 @@ public interface ReactiveMessageSender<T> {
Mono<MessageId> sendOne(MessageSpec<T> messageSpec);
/**
- * Send multiple messages and get the associated message ids in the
same order as the
- * messages sent.
+ * Send multiple messages and get the sending results in the same order
as the
+ * messages sent. The results are {@link MessageSendResult} objects
composed of the
+ * message ID of the message sent and of the original message spec that
was sent. A
+ * {@code correlationMetadata} can be attached to a {@link MessageSpec}
and retrieved
+ * with {@link MessageSendResult#getCorrelationMetadata()} to correlate
the messages
+ * sent with the results. A send error will terminate the returned Flux
with an error
+ * that is wrapped in a {@link ReactiveMessageSendingException} where
the
+ * {@link ReactiveMessageSendingException#getMessageSpec()} method will
return the
+ * MessageSpec sent as input.
* @param messageSpecs the specs of the messages to send
- * @return a publisher that will emit a message id per message
successfully sent in
- * the order that they have been sent
+ * @return a publisher that will emit a {@link MessageSendResult} per
message
+ * successfully sent in the order that they have been sent
*/
- Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs);
+ Flux<MessageSendResult<T>> sendMany(Publisher<MessageSpec<T>>
messageSpecs);
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSendingException.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSendingException.java
new file mode 100644
index 0000000..6e77ae3
--- /dev/null
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSendingException.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.reactive.client.api;
+
+import org.reactivestreams.Publisher;
+
+/**
+ * A wrapper exception used by {@link
ReactiveMessageSender#sendMany(Publisher)}. The
+ * {@link #getMessageSpec()} method will return the message that failed to be
sent.
+ */
+public class ReactiveMessageSendingException extends RuntimeException {
+
+ /**
+ * The message that failed to be sent.
+ */
+ private final MessageSpec<?> messageSpec;
+
+ /**
+ * Creates a new instance.
+ * @param cause the cause of the failure
+ * @param messageSpec the message that failed to be sent
+ */
+ public ReactiveMessageSendingException(Throwable cause, MessageSpec<?>
messageSpec) {
+ super(cause);
+ this.messageSpec = messageSpec;
+ }
+
+ @Override
+ public Throwable fillInStackTrace() {
+ return this;
+ }
+
+ /**
+ * Returns the message that failed to be sent.
+ * @return the message that failed to be sent.
+ */
+ public MessageSpec<?> getMessageSpec() {
+ return this.messageSpec;
+ }
+
+ /**
+ * Returns the correlation metadata of the message that failed to be
sent.
+ * @param <C> the correlation metadata type
+ * @return the correlation metadata of the message that failed to be
sent.
+ */
+ public <C> C getCorrelationMetadata() {
+ return this.messageSpec.getCorrelationMetadata();
+ }
+
+ @Override
+ public String getMessage() {
+ return "Message sending failed for message with correlation
metadata={" + this.getCorrelationMetadata() + "}";
+ }
+
+}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpec.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpec.java
index d348f8d..8b89945 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpec.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpec.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.reactive.client.internal.api;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -59,9 +60,11 @@ class DefaultMessageSpec<T> implements
InternalMessageSpec<T> {
private final TimeUnit deliverAfterUnit;
+ private final Object correlationMetadata;
+
DefaultMessageSpec(String key, byte[] orderingKey, byte[] keyBytes, T
value, Map<String, String> properties,
Long eventTime, Long sequenceId, List<String>
replicationClusters, boolean disableReplication,
- Long deliverAt, Long deliverAfterDelay, TimeUnit
deliverAfterUnit) {
+ Long deliverAt, Long deliverAfterDelay, TimeUnit
deliverAfterUnit, Object correlationMetadata) {
this.key = key;
this.orderingKey = orderingKey;
this.keyBytes = keyBytes;
@@ -74,6 +77,7 @@ class DefaultMessageSpec<T> implements InternalMessageSpec<T>
{
this.deliverAt = deliverAt;
this.deliverAfterDelay = deliverAfterDelay;
this.deliverAfterUnit = deliverAfterUnit;
+ this.correlationMetadata = correlationMetadata;
}
@Override
@@ -111,4 +115,50 @@ class DefaultMessageSpec<T> implements
InternalMessageSpec<T> {
}
}
+ @Override
+ public <C> C getCorrelationMetadata() {
+ return (C) this.correlationMetadata;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder = new
StringBuilder().append("DefaultMessageSpec{");
+ stringBuilder.append("value=").append(this.value);
+ if (this.key != null) {
+ stringBuilder.append(",
key='").append(this.key).append('\'');
+ }
+ if (this.orderingKey != null) {
+ stringBuilder.append(",
orderingKey=").append(Arrays.toString(this.orderingKey));
+ }
+ if (this.keyBytes != null) {
+ stringBuilder.append(",
keyBytes=").append(Arrays.toString(this.keyBytes));
+ }
+ if (this.properties != null) {
+ stringBuilder.append(",
properties=").append(this.properties);
+ }
+ if (this.eventTime != null) {
+ stringBuilder.append(",
eventTime=").append(this.eventTime);
+ }
+ if (this.sequenceId != null) {
+ stringBuilder.append(",
sequenceId=").append(this.sequenceId);
+ }
+ if (this.replicationClusters != null) {
+ stringBuilder.append(",
replicationClusters=").append(this.replicationClusters);
+ }
+ if (this.disableReplication) {
+ stringBuilder.append(",
disableReplication=").append(this.disableReplication);
+ }
+ if (this.deliverAt != null) {
+ stringBuilder.append(",
deliverAt=").append(this.deliverAt);
+ }
+ if (this.deliverAfterDelay != null) {
+ stringBuilder.append(",
deliverAfterDelay=").append(this.deliverAfterDelay);
+ stringBuilder.append(",
deliverAfterUnit=").append(this.deliverAfterUnit);
+ }
+ if (this.correlationMetadata != null) {
+ stringBuilder.append(",
correlationMetadata=").append(this.correlationMetadata);
+ }
+ return stringBuilder.append('}').toString();
+ }
+
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
index 0696609..7863ec3 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
@@ -61,6 +61,8 @@ class DefaultMessageSpecBuilder<T> implements
MessageSpecBuilder<T> {
private TimeUnit deliverAfterUnit;
+ private Object correlationMetadata;
+
@Override
public MessageSpecBuilder<T> key(String key) {
this.key = key;
@@ -140,11 +142,17 @@ class DefaultMessageSpecBuilder<T> implements
MessageSpecBuilder<T> {
return this;
}
+ @Override
+ public MessageSpecBuilder<T> correlationMetadata(Object
correlationMetadata) {
+ this.correlationMetadata = correlationMetadata;
+ return this;
+ }
+
@Override
public MessageSpec<T> build() {
return new DefaultMessageSpec<>(this.key, this.orderingKey,
this.keyBytes, this.value, this.properties,
this.eventTime, this.sequenceId,
this.replicationClusters, this.disableReplication, this.deliverAt,
- this.deliverAfterDelay, this.deliverAfterUnit);
+ this.deliverAfterDelay, this.deliverAfterUnit,
this.correlationMetadata);
}
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ValueOnlyMessageSpec.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ValueOnlyMessageSpec.java
index ec925a1..c29404b 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ValueOnlyMessageSpec.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ValueOnlyMessageSpec.java
@@ -39,4 +39,9 @@ class ValueOnlyMessageSpec<T> implements
InternalMessageSpec<T> {
typedMessageBuilder.value(this.value);
}
+ @Override
+ public String toString() {
+ return "ValueOnlyMessageSpec{" + "value=" + this.value + '}';
+ }
+
}
diff --git
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
index 751e19e..fe003be 100644
---
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
+++
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
@@ -44,7 +44,7 @@ class MessageSpecTest {
.orderingKey(new byte[] { 2
}).property("my-prop-key-1", "my-prop-value-1")
.properties(Collections.singletonMap("my-prop-key-2",
"my-prop-value-2")).eventTime(3).sequenceId(4)
.replicationClusters(Collections.singletonList("my-cluster")).disableReplication().deliverAt(5)
- .deliverAfter(6, TimeUnit.SECONDS).build();
+ .deliverAfter(6,
TimeUnit.SECONDS).correlationMetadata("my-context").build();
assertThat(messageSpec).isInstanceOf(InternalMessageSpec.class);
@@ -63,6 +63,9 @@ class MessageSpecTest {
assertThat(typedMessageBuilder.getDisableReplication()).isTrue();
assertThat(typedMessageBuilder.getDeliverAt()).isEqualTo(5);
assertThat(typedMessageBuilder.getDeliverAfter()).hasSeconds(6);
+
+ assertThat((String)
messageSpec.getCorrelationMetadata()).isEqualTo("my-context");
+
}
@Test