This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 49ec5b3  Add tests for CaffeineProducerCacheProvider (#84)
49ec5b3 is described below

commit 49ec5b30167fd67fb5621a043f520ad1e4fea31a
Author: Christophe Bornet <[email protected]>
AuthorDate: Mon Dec 5 13:08:16 2022 +0100

    Add tests for CaffeineProducerCacheProvider (#84)
---
 .../build.gradle                                   |   1 +
 .../CaffeineProducerCacheProviderTest.java         | 163 +++++++++++++++++++++
 2 files changed, 164 insertions(+)

diff --git a/pulsar-client-reactive-producer-cache-caffeine/build.gradle 
b/pulsar-client-reactive-producer-cache-caffeine/build.gradle
index c3e0058..825cc01 100644
--- a/pulsar-client-reactive-producer-cache-caffeine/build.gradle
+++ b/pulsar-client-reactive-producer-cache-caffeine/build.gradle
@@ -9,6 +9,7 @@ dependencies {
        testImplementation libs.junit.jupiter
        testImplementation libs.assertj.core
        testImplementation libs.reactor.test
+       testImplementation libs.mockito.core
 }
 
 description = "Caffeine implementation of producer cache"
diff --git 
a/pulsar-client-reactive-producer-cache-caffeine/src/test/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProviderTest.java
 
b/pulsar-client-reactive-producer-cache-caffeine/src/test/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProviderTest.java
new file mode 100644
index 0000000..a287ec0
--- /dev/null
+++ 
b/pulsar-client-reactive-producer-cache-caffeine/src/test/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProviderTest.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.producercache;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.CaffeineSpec;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerBase;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import 
org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
+import org.apache.pulsar.reactive.client.api.MessageSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import reactor.core.publisher.Flux;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+class CaffeineProducerCacheProviderTest {
+
+       @ParameterizedTest
+       @MethodSource
+       void cacheProvider(String name, CaffeineProducerCacheProvider 
cacheProvider) throws Exception {
+               PulsarClientImpl pulsarClient = spy(
+                               (PulsarClientImpl) 
PulsarClient.builder().serviceUrl("http://dummy";).build());
+
+               ProducerBase<String> producer = mock(ProducerBase.class);
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
+               doReturn(true).when(producer).isConnected();
+               TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
+                               new TypedMessageBuilderImpl<>(producer, 
Schema.STRING));
+               
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
+
+               doReturn(typedMessageBuilder).when(producer).newMessage();
+
+               
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+                               eq(Schema.STRING), isNull());
+
+               ProducerBase<Integer> producer2 = mock(ProducerBase.class);
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer2).closeAsync();
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer2).flushAsync();
+               doReturn(true).when(producer2).isConnected();
+               TypedMessageBuilderImpl<Integer> typedMessageBuilder2 = spy(
+                               new TypedMessageBuilderImpl<>(producer2, 
Schema.INT32));
+               
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder2).sendAsync();
+
+               doReturn(typedMessageBuilder2).when(producer2).newMessage();
+
+               
doReturn(CompletableFuture.completedFuture(producer2)).when(pulsarClient).createProducerAsync(any(),
+                               eq(Schema.INT32), isNull());
+
+               ReactiveMessageSenderCache cache = 
AdaptedReactivePulsarClientFactory.createCache(cacheProvider);
+
+               ReactiveMessageSender<String> sender = 
AdaptedReactivePulsarClientFactory.create(pulsarClient)
+                               
.messageSender(Schema.STRING).topic("my-topic").cache(cache).build();
+
+               
sender.sendOne(MessageSpec.of("a")).then(sender.sendOne(MessageSpec.of("b")))
+                               
.thenMany(Flux.just(MessageSpec.of("c")).as(sender::sendMany)).blockLast(Duration.ofSeconds(5));
+
+               verify(pulsarClient).createProducerAsync(any(), any(), 
isNull());
+       }
+
+       private static Stream<Arguments> cacheProvider() {
+               return Arrays
+                               .asList(Arguments.of("Default", new 
CaffeineProducerCacheProvider()), Arguments.of(
+                                               "From Caffeine builder",
+                                               new 
CaffeineProducerCacheProvider(Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(1))
+                                                               
.expireAfterWrite(Duration.ofMinutes(10)).maximumSize(1000))),
+                                               Arguments.of("From Caffeine 
spec",
+                                                               new 
CaffeineProducerCacheProvider(CaffeineSpec
+                                                                               
.parse("expireAfterAccess=1m,expireAfterWrite=10m,maximumSize=1000"))))
+                               .stream();
+       }
+
+       @Test
+       void loadedByServiceLoader() {
+               ReactiveMessageSenderCache cache = 
AdaptedReactivePulsarClientFactory.createCache();
+               
assertThat(cache).extracting("cacheProvider").isInstanceOf(CaffeineProducerCacheProvider.class);
+       }
+
+       @Test
+       void caffeinePropsAreRespected() throws Exception {
+               PulsarClientImpl pulsarClient = spy(
+                               (PulsarClientImpl) 
PulsarClient.builder().serviceUrl("http://dummy";).build());
+
+               ProducerBase<String> producer = mock(ProducerBase.class);
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
+               doReturn(true).when(producer).isConnected();
+               TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
+                               new TypedMessageBuilderImpl<>(producer, 
Schema.STRING));
+               
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
+
+               doReturn(typedMessageBuilder).when(producer).newMessage();
+
+               
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+                               eq(Schema.STRING), isNull());
+
+               ProducerBase<Integer> producer2 = mock(ProducerBase.class);
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer2).closeAsync();
+               
doReturn(CompletableFuture.completedFuture(null)).when(producer2).flushAsync();
+               doReturn(true).when(producer2).isConnected();
+               TypedMessageBuilderImpl<Integer> typedMessageBuilder2 = spy(
+                               new TypedMessageBuilderImpl<>(producer2, 
Schema.INT32));
+               
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder2).sendAsync();
+
+               doReturn(typedMessageBuilder2).when(producer2).newMessage();
+
+               
doReturn(CompletableFuture.completedFuture(producer2)).when(pulsarClient).createProducerAsync(any(),
+                               eq(Schema.INT32), isNull());
+
+               CaffeineProducerCacheProvider cacheProvider = new 
CaffeineProducerCacheProvider(
+                               
Caffeine.newBuilder().expireAfterWrite(Duration.ofMillis(100)).maximumSize(100));
+               ReactiveMessageSenderCache cache = 
AdaptedReactivePulsarClientFactory.createCache(cacheProvider);
+
+               ReactiveMessageSender<String> sender = 
AdaptedReactivePulsarClientFactory.create(pulsarClient)
+                               
.messageSender(Schema.STRING).topic("my-topic").cache(cache).build();
+
+               
sender.sendOne(MessageSpec.of("a")).then(sender.sendOne(MessageSpec.of("b")))
+                               
.thenMany(Flux.just(MessageSpec.of("c")).as(sender::sendMany)).blockLast(Duration.ofSeconds(5));
+
+               Thread.sleep(101);
+
+               
sender.sendOne(MessageSpec.of("d")).block(Duration.ofSeconds(5));
+
+               verify(pulsarClient, times(2)).createProducerAsync(any(), 
any(), isNull());
+       }
+
+}

Reply via email to