This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 3070bf2  Add correlation metadata to the message sent (#115)
3070bf2 is described below

commit 3070bf289f0673d0d30f8429d626ec381eda7ccd
Author: Christophe Bornet <[email protected]>
AuthorDate: Thu Jan 26 18:02:56 2023 +0100

    Add correlation metadata to the message sent (#115)
    
    * Add sendManyCorrelated to ReactiveMessageSender
    
    Fixes #22
    
    * Add correlation support for errors
    
    * Test correlating errors
    
    * sendOne shouldn't use correlation
    
    * Add userContext to MessageSpec
    
    * Update 
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSendingException.java
    
    * Fix test
    
    Co-authored-by: Lari Hotari <[email protected]>
    Co-authored-by: Lari Hotari <[email protected]>
---
 .../adapter/AdaptedReactiveMessageSender.java      |  14 ++-
 .../adapter/AdaptedReactiveMessageSenderTest.java  | 138 ++++++++++++++++++++-
 .../reactive/client/api/MessageSendResult.java     |  90 ++++++++++++++
 .../pulsar/reactive/client/api/MessageSpec.java    |   9 ++
 .../reactive/client/api/MessageSpecBuilder.java    |   8 ++
 .../reactive/client/api/ReactiveMessageSender.java |  17 ++-
 .../api/ReactiveMessageSendingException.java       |  72 +++++++++++
 .../client/internal/api/DefaultMessageSpec.java    |  52 +++++++-
 .../internal/api/DefaultMessageSpecBuilder.java    |  10 +-
 .../client/internal/api/ValueOnlyMessageSpec.java  |   5 +
 .../reactive/client/api/MessageSpecTest.java       |   5 +-
 11 files changed, 406 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
index c3a8b26..287871b 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
@@ -29,9 +29,11 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.reactive.client.api.MessageSendResult;
 import org.apache.pulsar.reactive.client.api.MessageSpec;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSendingException;
 import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
 import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
 import org.reactivestreams.Publisher;
@@ -190,6 +192,12 @@ class AdaptedReactiveMessageSender<T> implements 
ReactiveMessageSender<T> {
                                .usingProducer((producer, transformer) -> 
createMessageMono(messageSpec, producer, transformer));
        }
 
+       private Mono<MessageId> createMessageMonoWrapped(MessageSpec<T> 
messageSpec, Producer<T> producer,
+                       PublisherTransformer transformer) {
+               return createMessageMono(messageSpec, producer, transformer)
+                               .onErrorResume((throwable) -> Mono.error(new 
ReactiveMessageSendingException(throwable, messageSpec)));
+       }
+
        private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec, 
Producer<T> producer,
                        PublisherTransformer transformer) {
                return PulsarFutureAdapter.adaptPulsarFuture(() -> {
@@ -200,10 +208,12 @@ class AdaptedReactiveMessageSender<T> implements 
ReactiveMessageSender<T> {
        }
 
        @Override
-       public Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs) 
{
+       public Flux<MessageSendResult<T>> sendMany(Publisher<MessageSpec<T>> 
messageSpecs) {
                return createReactiveProducerAdapter()
                                .usingProducerMany((producer, transformer) -> 
Flux.from(messageSpecs).flatMapSequential(
-                                               (messageSpec) -> 
createMessageMono(messageSpec, producer, transformer), this.maxConcurrency));
+                                               (messageSpec) -> 
createMessageMonoWrapped(messageSpec, producer, transformer)
+                                                               
.map((messageId) -> new MessageSendResult<>(messageId, messageSpec)),
+                                               this.maxConcurrency));
        }
 
 }
diff --git 
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
 
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
index 400b432..65eff88 100644
--- 
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
+++ 
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
@@ -26,9 +26,11 @@ import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
@@ -45,6 +47,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ProducerBase;
@@ -53,10 +56,12 @@ import 
org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import 
org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
+import org.apache.pulsar.reactive.client.api.MessageSendResult;
 import org.apache.pulsar.reactive.client.api.MessageSpec;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSendingException;
 import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -162,6 +167,37 @@ class AdaptedReactiveMessageSenderTest {
                assertThat(messageId1).isEqualTo(MessageId.earliest);
        }
 
+       @Test
+       void sendOneErrorDoesntUseCorrelatedMessageSendingException() throws 
Exception {
+               PulsarClientImpl pulsarClient = spy(
+                               (PulsarClientImpl) 
PulsarClient.builder().serviceUrl("http://dummy";).build());
+
+               ProducerBase<String> producer = mock(ProducerBase.class);
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+
+               given(producer.newMessage()).willAnswer((__) -> {
+                       TypedMessageBuilderImpl<String> typedMessageBuilder = 
spy(
+                                       new TypedMessageBuilderImpl<>(producer, 
Schema.STRING));
+                       given(typedMessageBuilder.sendAsync()).willAnswer((___) 
-> {
+                               CompletableFuture<MessageId> failed = new 
CompletableFuture<>();
+                               failed.completeExceptionally(new 
ProducerQueueIsFullError("Queue is full"));
+                               return failed;
+                       });
+                       return typedMessageBuilder;
+               });
+
+               
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+                               eq(Schema.STRING), isNull());
+
+               ReactiveMessageSender<String> reactiveSender = 
AdaptedReactivePulsarClientFactory.create(pulsarClient)
+                               
.messageSender(Schema.STRING).topic("my-topic").build();
+
+               
StepVerifier.create(reactiveSender.sendOne(MessageSpec.of("test1")))
+                               // the original exception should be returned 
without wrapping it in
+                               // ReactiveMessageSendingException
+                               
.expectError(ProducerQueueIsFullError.class).verify();
+       }
+
        @Test
        void sendMany() throws Exception {
                PulsarClientImpl pulsarClient = spy(
@@ -184,8 +220,8 @@ class AdaptedReactiveMessageSenderTest {
                                
.messageSender(Schema.STRING).topic("my-topic").build();
 
                Flux<MessageSpec<String>> messageSpecs = 
Flux.just(MessageSpec.of("test1"), MessageSpec.of("test2"));
-               
StepVerifier.create(reactiveSender.sendMany(messageSpecs)).expectNext(MessageId.earliest)
-                               .expectNext(MessageId.latest).verifyComplete();
+               
StepVerifier.create(reactiveSender.sendMany(messageSpecs).map(MessageSendResult::getMessageId))
+                               
.expectNext(MessageId.earliest).expectNext(MessageId.latest).verifyComplete();
 
                verify(pulsarClient).createProducerAsync(any(), any(), 
isNull());
                InOrder inOrder = Mockito.inOrder(typedMessageBuilder1, 
typedMessageBuilder2);
@@ -195,6 +231,98 @@ class AdaptedReactiveMessageSenderTest {
                inOrder.verify(typedMessageBuilder2).sendAsync();
        }
 
+       @Test
+       void sendManyErrorShowsInputInMessage() throws Exception {
+               PulsarClientImpl pulsarClient = spy(
+                               (PulsarClientImpl) 
PulsarClient.builder().serviceUrl("http://dummy";).build());
+
+               ProducerBase<String> producer = mock(ProducerBase.class);
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+
+               AtomicInteger entryId = new AtomicInteger();
+               List<MessageId> messageIds = new CopyOnWriteArrayList<>();
+               given(producer.newMessage()).willAnswer((__) -> {
+                       TypedMessageBuilderImpl<String> typedMessageBuilder = 
spy(
+                                       new TypedMessageBuilderImpl<>(producer, 
Schema.STRING));
+                       given(typedMessageBuilder.sendAsync()).willAnswer((___) 
-> {
+                               if (entryId.get() == 1) {
+                                       CompletableFuture<MessageId> failed = 
new CompletableFuture<>();
+                                       failed.completeExceptionally(new 
ProducerQueueIsFullError("Queue is full"));
+                                       return failed;
+                               }
+                               MessageId messageId = 
DefaultImplementation.getDefaultImplementation().newMessageId(1,
+                                               entryId.incrementAndGet(), 1);
+                               messageIds.add(messageId);
+                               return 
CompletableFuture.completedFuture(messageId);
+                       });
+                       return typedMessageBuilder;
+               });
+
+               
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+                               eq(Schema.STRING), isNull());
+
+               ReactiveMessageSender<String> reactiveSender = 
AdaptedReactivePulsarClientFactory.create(pulsarClient)
+                               
.messageSender(Schema.STRING).topic("my-topic").build();
+
+               Flux<MessageSpec<String>> messageSpecs = 
Flux.just(MessageSpec.of("test1"),
+                               
MessageSpec.builder("test2").correlationMetadata("my-context").build());
+               StepVerifier.create(reactiveSender.sendMany(messageSpecs))
+                               .assertNext((next) -> 
assertThat(next.getMessageId()).isEqualTo(messageIds.get(0)))
+                               .expectErrorSatisfies((throwable) -> 
assertThat(throwable)
+                                               
.isInstanceOf(ReactiveMessageSendingException.class)
+                                               
.extracting(ReactiveMessageSendingException.class::cast)
+                                               .satisfies((cme) -> 
assertThat(cme.toString()).contains("correlation metadata={my-context}")))
+                               .verify();
+       }
+
+       @Test
+       void sendManyCorrelated() throws Exception {
+               PulsarClientImpl pulsarClient = spy(
+                               (PulsarClientImpl) 
PulsarClient.builder().serviceUrl("http://dummy";).build());
+
+               ProducerBase<String> producer = mock(ProducerBase.class);
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+
+               AtomicInteger entryId = new AtomicInteger();
+               List<MessageId> messageIds = new CopyOnWriteArrayList<>();
+               given(producer.newMessage()).willAnswer((__) -> {
+                       TypedMessageBuilderImpl<String> typedMessageBuilder = 
spy(
+                                       new TypedMessageBuilderImpl<>(producer, 
Schema.STRING));
+                       given(typedMessageBuilder.sendAsync()).willAnswer((___) 
-> {
+                               if (entryId.get() == 2) {
+                                       CompletableFuture<MessageId> failed = 
new CompletableFuture<>();
+                                       failed.completeExceptionally(new 
ProducerQueueIsFullError("Queue is full"));
+                                       return failed;
+                               }
+                               MessageId messageId = 
DefaultImplementation.getDefaultImplementation().newMessageId(1,
+                                               entryId.incrementAndGet(), 1);
+                               messageIds.add(messageId);
+                               return 
CompletableFuture.completedFuture(messageId);
+                       });
+                       return typedMessageBuilder;
+               });
+
+               
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+                               eq(Schema.STRING), isNull());
+
+               ReactiveMessageSender<String> reactiveSender = 
AdaptedReactivePulsarClientFactory.create(pulsarClient)
+                               
.messageSender(Schema.STRING).topic("my-topic").build();
+
+               MessageSpec<String> messageSpec1 = MessageSpec.of("test1");
+               Flux<MessageSpec<String>> keysAndMessageSpecs = 
Flux.just(messageSpec1,
+                               
MessageSpec.builder("test2").correlationMetadata(456).build(),
+                               
MessageSpec.builder("test3").correlationMetadata(789).build());
+               
StepVerifier.create(reactiveSender.sendMany(keysAndMessageSpecs)).assertNext((next)
 -> {
+                       
assertThat(next.getMessageId()).isEqualTo(messageIds.get(0));
+                       
assertThat(next.getMessageSpec()).isEqualTo(messageSpec1);
+               }).assertNext((next) -> {
+                       
assertThat(next.getMessageId()).isEqualTo(messageIds.get(1));
+                       assertThat((int) 
next.getCorrelationMetadata()).isEqualTo(456);
+               }).expectErrorSatisfies((throwable) -> 
assertThat(throwable).isInstanceOf(ReactiveMessageSendingException.class)
+                               
.extracting(ReactiveMessageSendingException.class::cast)
+                               .satisfies((cme) -> assertThat((int) 
cme.getCorrelationMetadata()).isEqualTo(789))).verify();
+       }
+
        @ParameterizedTest
        @MethodSource
        void senderCache(String name, ReactiveMessageSenderCache cache) throws 
Exception {
@@ -344,8 +472,10 @@ class AdaptedReactiveMessageSenderTest {
        @ParameterizedTest
        @CsvSource({ "7,100", "13,100", "37,500", "51,1000" })
        void maxInFlightUsingSendMany(int maxInflight, int maxElements) throws 
Exception {
-               doTestMaxInFlight((reactiveSender, inputFlux) -> 
inputFlux.window(3).flatMap(
-                               (subFlux) -> subFlux.map((i) -> 
MessageSpec.of(String.valueOf(i))).as(reactiveSender::sendMany), 100),
+               doTestMaxInFlight(
+                               (reactiveSender, inputFlux) -> 
inputFlux.window(3)
+                                               .flatMap((subFlux) -> 
subFlux.map((i) -> MessageSpec.of(String.valueOf(i)))
+                                                               
.as(reactiveSender::sendMany).map(MessageSendResult::getMessageId), 100),
                                maxInflight, maxElements);
        }
 
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSendResult.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSendResult.java
new file mode 100644
index 0000000..871b745
--- /dev/null
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSendResult.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.reactive.client.api;
+
+import java.util.Objects;
+
+import org.apache.pulsar.client.api.MessageId;
+
+/**
+ * Result of a message sending. Holds the spec of the message sent and the 
assigned
+ * message ID.
+ *
+ * @param <T> the type of the message
+ */
+public class MessageSendResult<T> {
+
+       private final MessageId messageId;
+
+       private final MessageSpec<T> messageSpec;
+
+       /**
+        * Creates a new instance.
+        * @param messageId the ID assigned to the message
+        * @param messageSpec the message spec that was sent
+        */
+       public MessageSendResult(MessageId messageId, MessageSpec<T> 
messageSpec) {
+               this.messageId = messageId;
+               this.messageSpec = messageSpec;
+       }
+
+       /**
+        * Gets the ID assigned to the message.
+        * @return the ID assigned to the message.
+        */
+       public MessageId getMessageId() {
+               return this.messageId;
+       }
+
+       /**
+        * Gets the message spec that was sent.
+        * @return the message spec that was sent
+        */
+       public MessageSpec<T> getMessageSpec() {
+               return this.messageSpec;
+       }
+
+       /**
+        * Gets the correlation metadata of the message spec that was sent.
+        * @param <C> the correlation metadata
+        * @return the correlation metadata
+        */
+       public <C> C getCorrelationMetadata() {
+               return this.messageSpec.getCorrelationMetadata();
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               MessageSendResult<?> that = (MessageSendResult<?>) o;
+               return Objects.equals(this.messageId, that.messageId) && 
Objects.equals(this.messageSpec, that.messageSpec);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(this.messageId, this.messageSpec);
+       }
+
+}
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpec.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpec.java
index c2377a0..d8ac241 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpec.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpec.java
@@ -48,4 +48,13 @@ public interface MessageSpec<T> {
                return 
ApiImplementationFactory.createValueOnlyMessageSpec(value);
        }
 
+       /**
+        * Gets the correlation metadata of this message spec.
+        * @param <C> the correlation metadata type
+        * @return the correlation metadata
+        */
+       default <C> C getCorrelationMetadata() {
+               return null;
+       }
+
 }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
index ba2e2f8..7d8fd2a 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
@@ -155,6 +155,14 @@ public interface MessageSpecBuilder<T> {
         */
        MessageSpecBuilder<T> deliverAfter(long delay, TimeUnit unit);
 
+       /**
+        * Attach a correlation metadata to the message spec to be able to 
correlate a sending
+        * operation and the sending operation result.
+        * @param correlationMetadata the correlation metadata to attach
+        * @return the message builder instance
+        */
+       MessageSpecBuilder<T> correlationMetadata(Object correlationMetadata);
+
        /**
         * Builds the {@link MessageSpec}.
         * @return the built message spec
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
index 8630c85..69dc31a 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
@@ -39,12 +39,19 @@ public interface ReactiveMessageSender<T> {
        Mono<MessageId> sendOne(MessageSpec<T> messageSpec);
 
        /**
-        * Send multiple messages and get the associated message ids in the 
same order as the
-        * messages sent.
+        * Send multiple messages and get the sending results in the same order 
as the
+        * messages sent. The results are {@link MessageSendResult} objects 
composed of the
+        * message ID of the message sent and of the original message spec that 
was sent. A
+        * {@code correlationMetadata} can be attached to a {@link MessageSpec} 
and retrieved
+        * with {@link MessageSendResult#getCorrelationMetadata()} to correlate 
the messages
+        * sent with the results. A send error will terminate the returned Flux 
with an error
+        * that is wrapped in a {@link ReactiveMessageSendingException} where 
the
+        * {@link ReactiveMessageSendingException#getMessageSpec()} method will 
return the
+        * MessageSpec sent as input.
         * @param messageSpecs the specs of the messages to send
-        * @return a publisher that will emit a message id per message 
successfully sent in
-        * the order that they have been sent
+        * @return a publisher that will emit a {@link MessageSendResult} per 
message
+        * successfully sent in the order that they have been sent
         */
-       Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs);
+       Flux<MessageSendResult<T>> sendMany(Publisher<MessageSpec<T>> 
messageSpecs);
 
 }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSendingException.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSendingException.java
new file mode 100644
index 0000000..6e77ae3
--- /dev/null
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSendingException.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.reactive.client.api;
+
+import org.reactivestreams.Publisher;
+
+/**
+ * A wrapper exception used by {@link 
ReactiveMessageSender#sendMany(Publisher)}. The
+ * {@link #getMessageSpec()} method will return the message that failed to be 
sent.
+ */
+public class ReactiveMessageSendingException extends RuntimeException {
+
+       /**
+        * The message that failed to be sent.
+        */
+       private final MessageSpec<?> messageSpec;
+
+       /**
+        * Creates a new instance.
+        * @param cause the cause of the failure
+        * @param messageSpec the message that failed to be sent
+        */
+       public ReactiveMessageSendingException(Throwable cause, MessageSpec<?> 
messageSpec) {
+               super(cause);
+               this.messageSpec = messageSpec;
+       }
+
+       @Override
+       public Throwable fillInStackTrace() {
+               return this;
+       }
+
+       /**
+        * Returns the message that failed to be sent.
+        * @return the message that failed to be sent.
+        */
+       public MessageSpec<?> getMessageSpec() {
+               return this.messageSpec;
+       }
+
+       /**
+        * Returns the correlation metadata of the message that failed to be 
sent.
+        * @param <C> the correlation metadata type
+        * @return the correlation metadata of the message that failed to be 
sent.
+        */
+       public <C> C getCorrelationMetadata() {
+               return this.messageSpec.getCorrelationMetadata();
+       }
+
+       @Override
+       public String getMessage() {
+               return "Message sending failed for message with correlation 
metadata={" + this.getCorrelationMetadata() + "}";
+       }
+
+}
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpec.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpec.java
index d348f8d..8b89945 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpec.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpec.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.reactive.client.internal.api;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -59,9 +60,11 @@ class DefaultMessageSpec<T> implements 
InternalMessageSpec<T> {
 
        private final TimeUnit deliverAfterUnit;
 
+       private final Object correlationMetadata;
+
        DefaultMessageSpec(String key, byte[] orderingKey, byte[] keyBytes, T 
value, Map<String, String> properties,
                        Long eventTime, Long sequenceId, List<String> 
replicationClusters, boolean disableReplication,
-                       Long deliverAt, Long deliverAfterDelay, TimeUnit 
deliverAfterUnit) {
+                       Long deliverAt, Long deliverAfterDelay, TimeUnit 
deliverAfterUnit, Object correlationMetadata) {
                this.key = key;
                this.orderingKey = orderingKey;
                this.keyBytes = keyBytes;
@@ -74,6 +77,7 @@ class DefaultMessageSpec<T> implements InternalMessageSpec<T> 
{
                this.deliverAt = deliverAt;
                this.deliverAfterDelay = deliverAfterDelay;
                this.deliverAfterUnit = deliverAfterUnit;
+               this.correlationMetadata = correlationMetadata;
        }
 
        @Override
@@ -111,4 +115,50 @@ class DefaultMessageSpec<T> implements 
InternalMessageSpec<T> {
                }
        }
 
+       @Override
+       public <C> C getCorrelationMetadata() {
+               return (C) this.correlationMetadata;
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder stringBuilder = new 
StringBuilder().append("DefaultMessageSpec{");
+               stringBuilder.append("value=").append(this.value);
+               if (this.key != null) {
+                       stringBuilder.append(", 
key='").append(this.key).append('\'');
+               }
+               if (this.orderingKey != null) {
+                       stringBuilder.append(", 
orderingKey=").append(Arrays.toString(this.orderingKey));
+               }
+               if (this.keyBytes != null) {
+                       stringBuilder.append(", 
keyBytes=").append(Arrays.toString(this.keyBytes));
+               }
+               if (this.properties != null) {
+                       stringBuilder.append(", 
properties=").append(this.properties);
+               }
+               if (this.eventTime != null) {
+                       stringBuilder.append(", 
eventTime=").append(this.eventTime);
+               }
+               if (this.sequenceId != null) {
+                       stringBuilder.append(", 
sequenceId=").append(this.sequenceId);
+               }
+               if (this.replicationClusters != null) {
+                       stringBuilder.append(", 
replicationClusters=").append(this.replicationClusters);
+               }
+               if (this.disableReplication) {
+                       stringBuilder.append(", 
disableReplication=").append(this.disableReplication);
+               }
+               if (this.deliverAt != null) {
+                       stringBuilder.append(", 
deliverAt=").append(this.deliverAt);
+               }
+               if (this.deliverAfterDelay != null) {
+                       stringBuilder.append(", 
deliverAfterDelay=").append(this.deliverAfterDelay);
+                       stringBuilder.append(", 
deliverAfterUnit=").append(this.deliverAfterUnit);
+               }
+               if (this.correlationMetadata != null) {
+                       stringBuilder.append(", 
correlationMetadata=").append(this.correlationMetadata);
+               }
+               return stringBuilder.append('}').toString();
+       }
+
 }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
index 0696609..7863ec3 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
@@ -61,6 +61,8 @@ class DefaultMessageSpecBuilder<T> implements 
MessageSpecBuilder<T> {
 
        private TimeUnit deliverAfterUnit;
 
+       private Object correlationMetadata;
+
        @Override
        public MessageSpecBuilder<T> key(String key) {
                this.key = key;
@@ -140,11 +142,17 @@ class DefaultMessageSpecBuilder<T> implements 
MessageSpecBuilder<T> {
                return this;
        }
 
+       @Override
+       public MessageSpecBuilder<T> correlationMetadata(Object 
correlationMetadata) {
+               this.correlationMetadata = correlationMetadata;
+               return this;
+       }
+
        @Override
        public MessageSpec<T> build() {
                return new DefaultMessageSpec<>(this.key, this.orderingKey, 
this.keyBytes, this.value, this.properties,
                                this.eventTime, this.sequenceId, 
this.replicationClusters, this.disableReplication, this.deliverAt,
-                               this.deliverAfterDelay, this.deliverAfterUnit);
+                               this.deliverAfterDelay, this.deliverAfterUnit, 
this.correlationMetadata);
        }
 
 }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ValueOnlyMessageSpec.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ValueOnlyMessageSpec.java
index ec925a1..c29404b 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ValueOnlyMessageSpec.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ValueOnlyMessageSpec.java
@@ -39,4 +39,9 @@ class ValueOnlyMessageSpec<T> implements 
InternalMessageSpec<T> {
                typedMessageBuilder.value(this.value);
        }
 
+       @Override
+       public String toString() {
+               return "ValueOnlyMessageSpec{" + "value=" + this.value + '}';
+       }
+
 }
diff --git 
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
 
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
index 751e19e..fe003be 100644
--- 
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
+++ 
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
@@ -44,7 +44,7 @@ class MessageSpecTest {
                                .orderingKey(new byte[] { 2 
}).property("my-prop-key-1", "my-prop-value-1")
                                
.properties(Collections.singletonMap("my-prop-key-2", 
"my-prop-value-2")).eventTime(3).sequenceId(4)
                                
.replicationClusters(Collections.singletonList("my-cluster")).disableReplication().deliverAt(5)
-                               .deliverAfter(6, TimeUnit.SECONDS).build();
+                               .deliverAfter(6, 
TimeUnit.SECONDS).correlationMetadata("my-context").build();
 
                assertThat(messageSpec).isInstanceOf(InternalMessageSpec.class);
 
@@ -63,6 +63,9 @@ class MessageSpecTest {
                
assertThat(typedMessageBuilder.getDisableReplication()).isTrue();
                assertThat(typedMessageBuilder.getDeliverAt()).isEqualTo(5);
                assertThat(typedMessageBuilder.getDeliverAfter()).hasSeconds(6);
+
+               assertThat((String) 
messageSpec.getCorrelationMetadata()).isEqualTo("my-context");
+
        }
 
        @Test

Reply via email to