This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 49083d9  Add tests for sender cache (#81)
49083d9 is described below

commit 49083d9d7fc4901b0622a37d747accda16c2c0b9
Author: Christophe Bornet <[email protected]>
AuthorDate: Sat Dec 3 10:53:28 2022 +0100

    Add tests for sender cache (#81)
---
 .../adapter/AdaptedReactiveMessageSenderTest.java  | 145 +++++++++++++++++++++
 1 file changed, 145 insertions(+)

diff --git 
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
 
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
index ea570f0..8d24258 100644
--- 
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
+++ 
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
@@ -17,11 +17,13 @@
 package org.apache.pulsar.reactive.client.internal.adapter;
 
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
@@ -41,8 +43,13 @@ import 
org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import 
org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
 import org.apache.pulsar.reactive.client.api.MessageSpec;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
 import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
 import reactor.core.publisher.Flux;
@@ -55,6 +62,7 @@ import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 /**
@@ -173,4 +181,141 @@ class AdaptedReactiveMessageSenderTest {
                inOrder.verify(typedMessageBuilder2).sendAsync();
        }
 
+       @ParameterizedTest
+       @MethodSource
+       void senderCache(String name, ReactiveMessageSenderCache cache) throws 
Exception {
+               PulsarClientImpl pulsarClient = spy(
+                               (PulsarClientImpl) 
PulsarClient.builder().serviceUrl("http://dummy";).build());
+
+               ProducerBase<String> producer = mock(ProducerBase.class);
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
+               doReturn(true).when(producer).isConnected();
+               TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
+                               new TypedMessageBuilderImpl<>(producer, 
Schema.STRING));
+               
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
+
+               doReturn(typedMessageBuilder).when(producer).newMessage();
+
+               
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+                               eq(Schema.STRING), isNull());
+
+               ProducerBase<Integer> producer2 = mock(ProducerBase.class);
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer2).closeAsync();
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer2).flushAsync();
+               doReturn(true).when(producer2).isConnected();
+               TypedMessageBuilderImpl<Integer> typedMessageBuilder2 = spy(
+                               new TypedMessageBuilderImpl<>(producer2, 
Schema.INT32));
+               
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder2).sendAsync();
+
+               doReturn(typedMessageBuilder2).when(producer2).newMessage();
+
+               
doReturn(CompletableFuture.completedFuture(producer2)).when(pulsarClient).createProducerAsync(any(),
+                               eq(Schema.INT32), isNull());
+
+               // Sender without cache
+               createSenderAndSendMessages(pulsarClient, Schema.STRING, 
"my-topic", null, new String[] { "a", "b", "c" });
+               verify(pulsarClient, times(3)).createProducerAsync(any(), 
any(), isNull());
+
+               // Sender with cache
+               createSenderAndSendMessages(pulsarClient, Schema.STRING, 
"my-topic", cache, new String[] { "a", "b", "c" });
+               verify(pulsarClient, times(4)).createProducerAsync(any(), 
any(), isNull());
+
+               // Other sender wih same cache, same Schema, same Producer 
config
+               createSenderAndSendMessages(pulsarClient, Schema.STRING, 
"my-topic", cache, new String[] { "d", "e", "f" });
+               verify(pulsarClient, times(4)).createProducerAsync(any(), 
any(), isNull());
+
+               // Other sender wih same cache, same Schema, different Producer 
config
+               createSenderAndSendMessages(pulsarClient, Schema.STRING, 
"my-other-topic", cache,
+                               new String[] { "a", "b", "c" });
+               verify(pulsarClient, times(5)).createProducerAsync(any(), 
any(), isNull());
+
+               // Other sender wih same cache, different Schema, same Producer 
config
+               createSenderAndSendMessages(pulsarClient, Schema.INT32, 
"my-topic", cache, new Integer[] { 42, 43, 44 });
+               verify(pulsarClient, times(6)).createProducerAsync(any(), 
any(), isNull());
+
+       }
+
+       private static Stream<Arguments> senderCache() {
+               return Arrays.asList(
+                               
Arguments.of("ConcurrentHashMapProducerCacheProvider",
+                                               
AdaptedReactivePulsarClientFactory.createCache(new 
ConcurrentHashMapProducerCacheProvider())),
+                               Arguments.of("Default", 
AdaptedReactivePulsarClientFactory.createCache())).stream();
+       }
+
+       private static <T> void createSenderAndSendMessages(PulsarClient 
client, Schema<T> schema, String topic,
+                       ReactiveMessageSenderCache cache, T[] values) {
+               assertThat(values).hasSize(3);
+               ReactiveMessageSenderBuilder<T> builder = 
AdaptedReactivePulsarClientFactory.create(client)
+                               .messageSender(schema).topic(topic);
+               if (cache != null) {
+                       builder.cache(cache);
+               }
+               ReactiveMessageSender<T> sender = builder.build();
+
+               
sender.sendOne(MessageSpec.of(values[0])).then(sender.sendOne(MessageSpec.of(values[1])))
+                               
.thenMany(Flux.just(MessageSpec.of(values[2])).as(sender::sendMany)).blockLast(Duration.ofSeconds(5));
+       }
+
+       @Test
+       void senderCacheEntryRecreatedIfProducerClosed() throws Exception {
+               PulsarClientImpl pulsarClient = spy(
+                               (PulsarClientImpl) 
PulsarClient.builder().serviceUrl("http://dummy";).build());
+
+               ProducerBase<String> producer = mock(ProducerBase.class);
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
+               doReturn(true).when(producer).isConnected();
+               TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
+                               new TypedMessageBuilderImpl<>(producer, 
Schema.STRING));
+               
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
+               doReturn(typedMessageBuilder).when(producer).newMessage();
+
+               
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+                               eq(Schema.STRING), isNull());
+
+               ReactiveMessageSenderCache cache = 
AdaptedReactivePulsarClientFactory.createCache();
+               createSenderAndSendMessages(pulsarClient, Schema.STRING, 
"my-topic", cache, new String[] { "a", "b", "c" });
+
+               ReactiveMessageSender<String> sender = 
AdaptedReactivePulsarClientFactory.create(pulsarClient)
+                               
.messageSender(Schema.STRING).topic("my-topic").cache(cache).build();
+
+               
sender.sendOne(MessageSpec.of("a")).block(Duration.ofSeconds(5));
+               
sender.sendOne(MessageSpec.of("b")).block(Duration.ofSeconds(5));
+
+               verify(pulsarClient).createProducerAsync(any(), any(), 
isNull());
+
+               // Disconnect the producer and send a new message
+               doReturn(false).when(producer).isConnected();
+               CompletableFuture<MessageId> messageIdFuture = 
sender.sendOne(MessageSpec.of("c")).toFuture();
+
+               Thread.sleep(100);
+
+               // Check that the disconnected producer is flushed and closed 
and a new producer
+               // is created
+               verify(producer).closeAsync();
+               verify(producer).flushAsync();
+               verify(pulsarClient, times(2)).createProducerAsync(any(), 
any(), isNull());
+
+               // Check that we wait for the producer to be reconnected
+               Thread.sleep(1000);
+               assertThat(messageIdFuture.isDone()).isFalse();
+
+               // Re-connect the producer
+               doReturn(true).when(producer).isConnected();
+               messageIdFuture.get(5, TimeUnit.SECONDS);
+
+               verify(pulsarClient, times(2)).createProducerAsync(any(), 
any(), isNull());
+
+               
sender.sendOne(MessageSpec.of("d")).block(Duration.ofSeconds(5));
+               verify(pulsarClient, times(2)).createProducerAsync(any(), 
any(), isNull());
+
+               // Verify that an error is emitted if the producer doesn't 
reconnect in time
+               doReturn(false).when(producer).isConnected();
+               Duration reconnectTimeout = 
StepVerifier.create(sender.sendOne(MessageSpec.of("c")))
+                               .verifyError(IllegalStateException.class);
+
+               assertThat(reconnectTimeout).isBetween(Duration.ofSeconds(4), 
Duration.ofSeconds(5));
+       }
+
 }

Reply via email to