This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 64ea60d Add shaded Caffeine producer cache provider (#124)
64ea60d is described below
commit 64ea60d5c3422d36b5da8c9fbc0224cf0c330428
Author: Chris Bono <[email protected]>
AuthorDate: Sun Apr 16 17:51:40 2023 -0500
Add shaded Caffeine producer cache provider (#124)
---
README.adoc | 45 +++++-
gradle.properties | 2 +-
pulsar-client-reactive-adapter/build.gradle | 1 +
.../adapter/ReactiveMessageSenderE2ETest.java | 13 +-
.../build.gradle | 84 +++++++++++
.../CaffeineShadedProducerCacheProvider.java | 87 +++++++++++
...CaffeineShadedProducerCacheProviderFactory.java | 21 ++-
...ive.client.adapter.ProducerCacheProviderFactory | 1 +
.../CaffeineShadedProducerCacheProviderTest.java | 159 +++++++++++++++++++++
settings.gradle | 2 +-
10 files changed, 400 insertions(+), 15 deletions(-)
diff --git a/README.adoc b/README.adoc
index feecce1..92af685 100644
--- a/README.adoc
+++ b/README.adoc
@@ -80,7 +80,9 @@ messageId.subscribe(System.out::println);
=== Sending messages with cached producer
-By default a ConcurrentHashMap based cache is used. It's recommended to use a
more advanced cache based on Caffeine. The cache will get used as the default
implementation when it is on the classpath.
+By default, a ConcurrentHashMap based cache is used.
+It's recommended to use a more advanced cache based on Caffeine.
+The cache will get used as the default implementation when it is on the
classpath.
Adding Caffeine based producer cache with Gradle:
@@ -126,10 +128,47 @@ Mono<MessageId> messageId = messageSender
messageId.subscribe(System.out::println);
----
-It is recommended to use a cached producer in most cases. The cache enables
reusing the Pulsar Producer instance and related resources across multiple
message sending calls.
+It is recommended to use a cached producer in most cases.
+The cache enables reusing the Pulsar Producer instance and related resources
across multiple message sending calls.
This improves performance since a producer won't have to be created and closed
before and after sending a message.
-The adapter library implementation together with the cache implementation will
also enable reactive backpressure for sending messages. The `maxInflight`
setting will limit the number of messages that are pending from the client to
the broker. The solution will limit reactive streams subscription requests to
keep the number of pending messages under the defined limit. This limit is
per-topic and impacts the local JVM only.
+The adapter library implementation together with the cache implementation will
also enable reactive backpressure for sending messages.
+The `maxInflight` setting will limit the number of messages that are pending
from the client to the broker.
+The solution will limit reactive streams subscription requests to keep the
number of pending messages under the defined limit.
+This limit is per-topic and impacts the local JVM only.
+
+=== Shaded version of Caffeine
+A version of the provider is available that shades it usage of Caffeine.
+This is useful in scenarios where there is another version of Caffeine
required in your application or if you do not want Caffeine on the classpath.
+
+Adding shaded Caffeine based producer cache with Gradle:
+
+[source,groovy,subs="verbatim,attributes"]
+----
+dependencies {
+ implementation
"org.apache.pulsar:pulsar-client-reactive-adapter:{latest_version}"
+ implementation
"org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine-shaded:{latest_version}"
+}
+----
+
+Adding shaded Caffeine based producer cache with Maven:
+
+[source,xml,subs="verbatim,attributes"]
+----
+<dependencies>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-reactive-adapter</artifactId>
+ <version>{latest_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+
<artifactId>pulsar-client-reactive-producer-cache-caffeine-shaded</artifactId>
+ <version>{latest_version}</version>
+ </dependency>
+</dependencies>
+----
+
=== Reading messages
diff --git a/gradle.properties b/gradle.properties
index 3e4584e..8cbe804 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,4 +17,4 @@
# under the License.
#
-version=0.2.0
+version=0.2.1-SNAPSHOT
diff --git a/pulsar-client-reactive-adapter/build.gradle
b/pulsar-client-reactive-adapter/build.gradle
index b18bb3d..5021029 100644
--- a/pulsar-client-reactive-adapter/build.gradle
+++ b/pulsar-client-reactive-adapter/build.gradle
@@ -37,6 +37,7 @@ dependencies {
testImplementation libs.mockito.inline
intTestImplementation
project(':pulsar-client-reactive-producer-cache-caffeine')
+ intTestImplementation project(path:
':pulsar-client-reactive-producer-cache-caffeine-shaded', configuration:
'shadow')
intTestImplementation libs.junit.jupiter
intTestImplementation libs.testcontainers.pulsar
intTestImplementation libs.assertj.core
diff --git
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
index d094a34..5415f2f 100644
---
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
+++
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
@@ -36,6 +36,7 @@ import
org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
import
org.apache.pulsar.reactive.client.internal.adapter.ConcurrentHashMapProducerCacheProvider;
import
org.apache.pulsar.reactive.client.producercache.CaffeineProducerCacheProvider;
+import
org.apache.pulsar.reactive.client.producercache.CaffeineShadedProducerCacheProvider;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -46,12 +47,14 @@ import static org.assertj.core.api.Assertions.assertThat;
class ReactiveMessageSenderE2ETest {
private static Stream<Arguments>
shouldSendMessageToTopicWithCachedProducer() {
- return Arrays
-
.asList(Arguments.of("ConcurrentHashMapProducerCacheProvider",
+ return Arrays.asList(
+
Arguments.of("ConcurrentHashMapProducerCacheProvider",
AdaptedReactivePulsarClientFactory.createCache(new
ConcurrentHashMapProducerCacheProvider())),
- Arguments.of("Default",
AdaptedReactivePulsarClientFactory.createCache()),
-
Arguments.of("CaffeineProducerCacheProvider",
-
AdaptedReactivePulsarClientFactory.createCache(new
CaffeineProducerCacheProvider())))
+ Arguments.of("Default",
AdaptedReactivePulsarClientFactory.createCache()),
+ Arguments.of("CaffeineProducerCacheProvider",
+
AdaptedReactivePulsarClientFactory.createCache(new
CaffeineProducerCacheProvider())),
+
Arguments.of("CaffeineShadedProducerCacheProvider",
+
AdaptedReactivePulsarClientFactory.createCache(new
CaffeineShadedProducerCacheProvider())))
.stream();
}
diff --git a/pulsar-client-reactive-producer-cache-caffeine-shaded/build.gradle
b/pulsar-client-reactive-producer-cache-caffeine-shaded/build.gradle
new file mode 100644
index 0000000..e0c639b
--- /dev/null
+++ b/pulsar-client-reactive-producer-cache-caffeine-shaded/build.gradle
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+ id 'pulsar-client-reactive.codestyle-conventions'
+ id 'pulsar-client-reactive.library-conventions'
+ id 'com.github.johnrengelman.shadow' version '7.1.2'
+}
+
+dependencies {
+ api project(':pulsar-client-reactive-adapter')
+ implementation libs.caffeine
+ shadow project(':pulsar-client-reactive-adapter')
+ testImplementation libs.junit.jupiter
+ testImplementation libs.assertj.core
+ testImplementation libs.reactor.test
+ testImplementation libs.mockito.core
+}
+
+description = "Caffeine (shaded) implementation of producer cache"
+
+jar {
+ archiveClassifier.set('original')
+}
+
+shadowJar {
+ archiveClassifier.set(null)
+ dependsOn(project.tasks.jar)
+ manifest {
+ inheritFrom project.tasks.jar.manifest
+ }
+ relocate 'com.github.benmanes.caffeine',
'org.springframework.pulsar.shade.com.github.benmanes.caffeine'
+ relocate 'com.google', 'org.springframework.pulsar.shade.com.google'
+ relocate 'org.checkerframework',
'org.springframework.pulsar.shade.org.checkerframework'
+ dependencies {
+ exclude(dependency {
+ !['com.github.ben-manes.caffeine',
'org.checkerframework', 'com.google.errorprone'].contains(it.moduleGroup)
+ })
+ }
+}
+
+tasks.build.dependsOn tasks.shadowJar
+
+// disable module metadata - otherwise original jar will be used when published
+tasks.withType(GenerateModuleMetadata) {
+ enabled = false
+}
+
+// delay the maven publishing - instead add shadowJar to the publication
+components.java.withVariantsFromConfiguration(configurations.shadowRuntimeElements)
{
+ skip()
+}
+
+publishing {
+ publications {
+ mavenJava {
+ artifact(shadowJar)
+ pom.withXml {
+ Node pomNode = asNode()
+ pomNode.dependencies.'*'.findAll() {
+ it.artifactId.text() == 'caffeine'
+ }.each() {
+ it.parent().remove(it)
+ }
+ }
+ }
+ }
+}
diff --git
a/pulsar-client-reactive-producer-cache-caffeine-shaded/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineShadedProducerCacheProvider.java
b/pulsar-client-reactive-producer-cache-caffeine-shaded/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineShadedProducerCacheProvider.java
new file mode 100644
index 0000000..fa45acd
--- /dev/null
+++
b/pulsar-client-reactive-producer-cache-caffeine-shaded/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineShadedProducerCacheProvider.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.reactive.client.producercache;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
+import reactor.core.scheduler.Schedulers;
+
+/**
+ * Producer cache provider that uses a shaded Caffeine {@link AsyncCache} to
cache
+ * entries.
+ */
+public class CaffeineShadedProducerCacheProvider implements
ProducerCacheProvider {
+
+ private final AsyncCache<Object, Object> cache;
+
+ /**
+ * Create a cache provider instance with default values.
+ */
+ public CaffeineShadedProducerCacheProvider() {
+ this(Duration.ofMinutes(1), Duration.ofMinutes(10), 1000L, 50);
+ }
+
+ /**
+ * Create a cache provider instance with the specified options.
+ * @param cacheExpireAfterAccess time period after last access to
expire unused
+ * entries in the cache
+ * @param cacheExpireAfterWrite time period after last write to expire
unused entries
+ * in the cache
+ * @param cacheMaximumSize maximum size of cache (entries)
+ * @param cacheInitialCapacity the initial size of cache
+ */
+ public CaffeineShadedProducerCacheProvider(Duration
cacheExpireAfterAccess, Duration cacheExpireAfterWrite,
+ Long cacheMaximumSize, Integer cacheInitialCapacity) {
+ this.cache =
Caffeine.newBuilder().expireAfterAccess(cacheExpireAfterAccess)
+
.expireAfterWrite(cacheExpireAfterWrite).maximumSize(cacheMaximumSize)
+
.initialCapacity(cacheInitialCapacity).scheduler(Scheduler.systemScheduler())
+
.executor(Schedulers.boundedElastic()::schedule).removalListener(this::onRemoval).buildAsync();
+ }
+
+ private void onRemoval(Object key, Object entry, RemovalCause cause) {
+ if (entry instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) entry).close();
+ }
+ catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ public void close() {
+ this.cache.synchronous().invalidateAll();
+ }
+
+ @Override
+ public <K, V> CompletableFuture<V> getOrCreateCachedEntry(K key,
+ Function<K, CompletableFuture<V>> createEntryFunction) {
+ return (CompletableFuture<V>) this.cache.get(key,
+ (__, ___) -> (CompletableFuture)
createEntryFunction.apply(key));
+ }
+
+}
diff --git a/settings.gradle
b/pulsar-client-reactive-producer-cache-caffeine-shaded/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineShadedProducerCacheProviderFactory.java
similarity index 60%
copy from settings.gradle
copy to
pulsar-client-reactive-producer-cache-caffeine-shaded/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineShadedProducerCacheProviderFactory.java
index b1cd019..d98fcf2 100644
--- a/settings.gradle
+++
b/pulsar-client-reactive-producer-cache-caffeine-shaded/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineShadedProducerCacheProviderFactory.java
@@ -17,9 +17,20 @@
* under the License.
*/
-rootProject.name = 'pulsar-client-reactive'
-include 'pulsar-client-reactive-api'
-include 'pulsar-client-reactive-adapter'
-include 'pulsar-client-reactive-producer-cache-caffeine'
-include 'pulsar-client-reactive-jackson'
+package org.apache.pulsar.reactive.client.producercache;
+import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
+import org.apache.pulsar.reactive.client.adapter.ProducerCacheProviderFactory;
+
+/**
+ * {@link ProducerCacheProviderFactory} that creates instances of
+ * {@link CaffeineShadedProducerCacheProvider}.
+ */
+public class CaffeineShadedProducerCacheProviderFactory implements
ProducerCacheProviderFactory {
+
+ @Override
+ public ProducerCacheProvider get() {
+ return new CaffeineShadedProducerCacheProvider();
+ }
+
+}
diff --git
a/pulsar-client-reactive-producer-cache-caffeine-shaded/src/main/resources/META-INF/services/org.apache.pulsar.reactive.client.adapter.ProducerCacheProviderFactory
b/pulsar-client-reactive-producer-cache-caffeine-shaded/src/main/resources/META-INF/services/org.apache.pulsar.reactive.client.adapter.ProducerCacheProviderFactory
new file mode 100644
index 0000000..b6bcc41
--- /dev/null
+++
b/pulsar-client-reactive-producer-cache-caffeine-shaded/src/main/resources/META-INF/services/org.apache.pulsar.reactive.client.adapter.ProducerCacheProviderFactory
@@ -0,0 +1 @@
+org.apache.pulsar.reactive.client.producercache.CaffeineShadedProducerCacheProviderFactory
diff --git
a/pulsar-client-reactive-producer-cache-caffeine-shaded/src/test/java/org/apache/pulsar/reactive/client/producercache/CaffeineShadedProducerCacheProviderTest.java
b/pulsar-client-reactive-producer-cache-caffeine-shaded/src/test/java/org/apache/pulsar/reactive/client/producercache/CaffeineShadedProducerCacheProviderTest.java
new file mode 100644
index 0000000..8f7518b
--- /dev/null
+++
b/pulsar-client-reactive-producer-cache-caffeine-shaded/src/test/java/org/apache/pulsar/reactive/client/producercache/CaffeineShadedProducerCacheProviderTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.reactive.client.producercache;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerBase;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import
org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
+import org.apache.pulsar.reactive.client.api.MessageSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import reactor.core.publisher.Flux;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+class CaffeineShadedProducerCacheProviderTest {
+
+ @ParameterizedTest
+ @MethodSource
+ void cacheProvider(String name, CaffeineShadedProducerCacheProvider
cacheProvider) throws Exception {
+ PulsarClientImpl pulsarClient = spy(
+ (PulsarClientImpl)
PulsarClient.builder().serviceUrl("http://dummy").build());
+
+ ProducerBase<String> producer = mock(ProducerBase.class);
+
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+
doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
+ doReturn(true).when(producer).isConnected();
+ TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
+ new TypedMessageBuilderImpl<>(producer,
Schema.STRING));
+
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
+
+ doReturn(typedMessageBuilder).when(producer).newMessage();
+
+
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+ eq(Schema.STRING), isNull());
+
+ ProducerBase<Integer> producer2 = mock(ProducerBase.class);
+
doReturn(CompletableFuture.completedFuture(null)).when(producer2).closeAsync();
+
doReturn(CompletableFuture.completedFuture(null)).when(producer2).flushAsync();
+ doReturn(true).when(producer2).isConnected();
+ TypedMessageBuilderImpl<Integer> typedMessageBuilder2 = spy(
+ new TypedMessageBuilderImpl<>(producer2,
Schema.INT32));
+
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder2).sendAsync();
+
+ doReturn(typedMessageBuilder2).when(producer2).newMessage();
+
+
doReturn(CompletableFuture.completedFuture(producer2)).when(pulsarClient).createProducerAsync(any(),
+ eq(Schema.INT32), isNull());
+
+ ReactiveMessageSenderCache cache =
AdaptedReactivePulsarClientFactory.createCache(cacheProvider);
+
+ ReactiveMessageSender<String> sender =
AdaptedReactivePulsarClientFactory.create(pulsarClient)
+
.messageSender(Schema.STRING).topic("my-topic").cache(cache).build();
+
+
sender.sendOne(MessageSpec.of("a")).then(sender.sendOne(MessageSpec.of("b")))
+
.thenMany(Flux.just(MessageSpec.of("c")).as(sender::sendMany)).blockLast(Duration.ofSeconds(5));
+
+ verify(pulsarClient).createProducerAsync(any(), any(),
isNull());
+ }
+
+ private static Stream<Arguments> cacheProvider() {
+ return Arrays.asList(Arguments.of("Default <init>", new
CaffeineShadedProducerCacheProvider()), Arguments.of(
+ "Params <init>",
+ new
CaffeineShadedProducerCacheProvider(Duration.ofMinutes(1),
Duration.ofMinutes(10), 1000L, 50)))
+ .stream();
+ }
+
+ @Test
+ void loadedByServiceLoader() {
+ ReactiveMessageSenderCache cache =
AdaptedReactivePulsarClientFactory.createCache();
+
assertThat(cache).extracting("cacheProvider").isInstanceOf(CaffeineShadedProducerCacheProvider.class);
+ }
+
+ @Test
+ void caffeinePropsAreRespected() throws Exception {
+ PulsarClientImpl pulsarClient = spy(
+ (PulsarClientImpl)
PulsarClient.builder().serviceUrl("http://dummy").build());
+
+ ProducerBase<String> producer = mock(ProducerBase.class);
+
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+
doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
+ doReturn(true).when(producer).isConnected();
+ TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
+ new TypedMessageBuilderImpl<>(producer,
Schema.STRING));
+
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
+
+ doReturn(typedMessageBuilder).when(producer).newMessage();
+
+
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+ eq(Schema.STRING), isNull());
+
+ ProducerBase<Integer> producer2 = mock(ProducerBase.class);
+
doReturn(CompletableFuture.completedFuture(null)).when(producer2).closeAsync();
+
doReturn(CompletableFuture.completedFuture(null)).when(producer2).flushAsync();
+ doReturn(true).when(producer2).isConnected();
+ TypedMessageBuilderImpl<Integer> typedMessageBuilder2 = spy(
+ new TypedMessageBuilderImpl<>(producer2,
Schema.INT32));
+
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder2).sendAsync();
+
+ doReturn(typedMessageBuilder2).when(producer2).newMessage();
+
+
doReturn(CompletableFuture.completedFuture(producer2)).when(pulsarClient).createProducerAsync(any(),
+ eq(Schema.INT32), isNull());
+
+ CaffeineShadedProducerCacheProvider cacheProvider = new
CaffeineShadedProducerCacheProvider(
+ Duration.ofMinutes(1), Duration.ofMillis(100),
100L, 50);
+ ReactiveMessageSenderCache cache =
AdaptedReactivePulsarClientFactory.createCache(cacheProvider);
+
+ ReactiveMessageSender<String> sender =
AdaptedReactivePulsarClientFactory.create(pulsarClient)
+
.messageSender(Schema.STRING).topic("my-topic").cache(cache).build();
+
+
sender.sendOne(MessageSpec.of("a")).then(sender.sendOne(MessageSpec.of("b")))
+
.thenMany(Flux.just(MessageSpec.of("c")).as(sender::sendMany)).blockLast(Duration.ofSeconds(5));
+
+ Thread.sleep(101);
+
+
sender.sendOne(MessageSpec.of("d")).block(Duration.ofSeconds(5));
+
+ verify(pulsarClient, times(2)).createProducerAsync(any(),
any(), isNull());
+ }
+
+}
diff --git a/settings.gradle b/settings.gradle
index b1cd019..7c794b3 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -21,5 +21,5 @@ rootProject.name = 'pulsar-client-reactive'
include 'pulsar-client-reactive-api'
include 'pulsar-client-reactive-adapter'
include 'pulsar-client-reactive-producer-cache-caffeine'
+include 'pulsar-client-reactive-producer-cache-caffeine-shaded'
include 'pulsar-client-reactive-jackson'
-