This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 49083d9 Add tests for sender cache (#81)
49083d9 is described below
commit 49083d9d7fc4901b0622a37d747accda16c2c0b9
Author: Christophe Bornet <[email protected]>
AuthorDate: Sat Dec 3 10:53:28 2022 +0100
Add tests for sender cache (#81)
---
.../adapter/AdaptedReactiveMessageSenderTest.java | 145 +++++++++++++++++++++
1 file changed, 145 insertions(+)
diff --git
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
index ea570f0..8d24258 100644
---
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
+++
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
@@ -17,11 +17,13 @@
package org.apache.pulsar.reactive.client.internal.adapter;
import java.time.Duration;
+import java.util.Arrays;
import java.util.Collections;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
@@ -41,8 +43,13 @@ import
org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import
org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
import org.apache.pulsar.reactive.client.api.MessageSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.InOrder;
import org.mockito.Mockito;
import reactor.core.publisher.Flux;
@@ -55,6 +62,7 @@ import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
@@ -173,4 +181,141 @@ class AdaptedReactiveMessageSenderTest {
inOrder.verify(typedMessageBuilder2).sendAsync();
}
+ @ParameterizedTest
+ @MethodSource
+ void senderCache(String name, ReactiveMessageSenderCache cache) throws
Exception {
+ PulsarClientImpl pulsarClient = spy(
+ (PulsarClientImpl)
PulsarClient.builder().serviceUrl("http://dummy").build());
+
+ ProducerBase<String> producer = mock(ProducerBase.class);
+
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+
doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
+ doReturn(true).when(producer).isConnected();
+ TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
+ new TypedMessageBuilderImpl<>(producer,
Schema.STRING));
+
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
+
+ doReturn(typedMessageBuilder).when(producer).newMessage();
+
+
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+ eq(Schema.STRING), isNull());
+
+ ProducerBase<Integer> producer2 = mock(ProducerBase.class);
+
doReturn(CompletableFuture.completedFuture(null)).when(producer2).closeAsync();
+
doReturn(CompletableFuture.completedFuture(null)).when(producer2).flushAsync();
+ doReturn(true).when(producer2).isConnected();
+ TypedMessageBuilderImpl<Integer> typedMessageBuilder2 = spy(
+ new TypedMessageBuilderImpl<>(producer2,
Schema.INT32));
+
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder2).sendAsync();
+
+ doReturn(typedMessageBuilder2).when(producer2).newMessage();
+
+
doReturn(CompletableFuture.completedFuture(producer2)).when(pulsarClient).createProducerAsync(any(),
+ eq(Schema.INT32), isNull());
+
+ // Sender without cache
+ createSenderAndSendMessages(pulsarClient, Schema.STRING,
"my-topic", null, new String[] { "a", "b", "c" });
+ verify(pulsarClient, times(3)).createProducerAsync(any(),
any(), isNull());
+
+ // Sender with cache
+ createSenderAndSendMessages(pulsarClient, Schema.STRING,
"my-topic", cache, new String[] { "a", "b", "c" });
+ verify(pulsarClient, times(4)).createProducerAsync(any(),
any(), isNull());
+
+ // Other sender wih same cache, same Schema, same Producer
config
+ createSenderAndSendMessages(pulsarClient, Schema.STRING,
"my-topic", cache, new String[] { "d", "e", "f" });
+ verify(pulsarClient, times(4)).createProducerAsync(any(),
any(), isNull());
+
+ // Other sender wih same cache, same Schema, different Producer
config
+ createSenderAndSendMessages(pulsarClient, Schema.STRING,
"my-other-topic", cache,
+ new String[] { "a", "b", "c" });
+ verify(pulsarClient, times(5)).createProducerAsync(any(),
any(), isNull());
+
+ // Other sender wih same cache, different Schema, same Producer
config
+ createSenderAndSendMessages(pulsarClient, Schema.INT32,
"my-topic", cache, new Integer[] { 42, 43, 44 });
+ verify(pulsarClient, times(6)).createProducerAsync(any(),
any(), isNull());
+
+ }
+
+ private static Stream<Arguments> senderCache() {
+ return Arrays.asList(
+
Arguments.of("ConcurrentHashMapProducerCacheProvider",
+
AdaptedReactivePulsarClientFactory.createCache(new
ConcurrentHashMapProducerCacheProvider())),
+ Arguments.of("Default",
AdaptedReactivePulsarClientFactory.createCache())).stream();
+ }
+
+ private static <T> void createSenderAndSendMessages(PulsarClient
client, Schema<T> schema, String topic,
+ ReactiveMessageSenderCache cache, T[] values) {
+ assertThat(values).hasSize(3);
+ ReactiveMessageSenderBuilder<T> builder =
AdaptedReactivePulsarClientFactory.create(client)
+ .messageSender(schema).topic(topic);
+ if (cache != null) {
+ builder.cache(cache);
+ }
+ ReactiveMessageSender<T> sender = builder.build();
+
+
sender.sendOne(MessageSpec.of(values[0])).then(sender.sendOne(MessageSpec.of(values[1])))
+
.thenMany(Flux.just(MessageSpec.of(values[2])).as(sender::sendMany)).blockLast(Duration.ofSeconds(5));
+ }
+
+ @Test
+ void senderCacheEntryRecreatedIfProducerClosed() throws Exception {
+ PulsarClientImpl pulsarClient = spy(
+ (PulsarClientImpl)
PulsarClient.builder().serviceUrl("http://dummy").build());
+
+ ProducerBase<String> producer = mock(ProducerBase.class);
+
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+
doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
+ doReturn(true).when(producer).isConnected();
+ TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
+ new TypedMessageBuilderImpl<>(producer,
Schema.STRING));
+
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
+ doReturn(typedMessageBuilder).when(producer).newMessage();
+
+
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+ eq(Schema.STRING), isNull());
+
+ ReactiveMessageSenderCache cache =
AdaptedReactivePulsarClientFactory.createCache();
+ createSenderAndSendMessages(pulsarClient, Schema.STRING,
"my-topic", cache, new String[] { "a", "b", "c" });
+
+ ReactiveMessageSender<String> sender =
AdaptedReactivePulsarClientFactory.create(pulsarClient)
+
.messageSender(Schema.STRING).topic("my-topic").cache(cache).build();
+
+
sender.sendOne(MessageSpec.of("a")).block(Duration.ofSeconds(5));
+
sender.sendOne(MessageSpec.of("b")).block(Duration.ofSeconds(5));
+
+ verify(pulsarClient).createProducerAsync(any(), any(),
isNull());
+
+ // Disconnect the producer and send a new message
+ doReturn(false).when(producer).isConnected();
+ CompletableFuture<MessageId> messageIdFuture =
sender.sendOne(MessageSpec.of("c")).toFuture();
+
+ Thread.sleep(100);
+
+ // Check that the disconnected producer is flushed and closed
and a new producer
+ // is created
+ verify(producer).closeAsync();
+ verify(producer).flushAsync();
+ verify(pulsarClient, times(2)).createProducerAsync(any(),
any(), isNull());
+
+ // Check that we wait for the producer to be reconnected
+ Thread.sleep(1000);
+ assertThat(messageIdFuture.isDone()).isFalse();
+
+ // Re-connect the producer
+ doReturn(true).when(producer).isConnected();
+ messageIdFuture.get(5, TimeUnit.SECONDS);
+
+ verify(pulsarClient, times(2)).createProducerAsync(any(),
any(), isNull());
+
+
sender.sendOne(MessageSpec.of("d")).block(Duration.ofSeconds(5));
+ verify(pulsarClient, times(2)).createProducerAsync(any(),
any(), isNull());
+
+ // Verify that an error is emitted if the producer doesn't
reconnect in time
+ doReturn(false).when(producer).isConnected();
+ Duration reconnectTimeout =
StepVerifier.create(sender.sendOne(MessageSpec.of("c")))
+ .verifyError(IllegalStateException.class);
+
+ assertThat(reconnectTimeout).isBetween(Duration.ofSeconds(4),
Duration.ofSeconds(5));
+ }
+
}