This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 8c8cf9f Add javadoc to Producer cache related classes (#51)
8c8cf9f is described below
commit 8c8cf9fb633a2600ea5ef04cba41cd5c06e306b2
Author: Christophe Bornet <[email protected]>
AuthorDate: Tue Nov 29 10:53:14 2022 +0100
Add javadoc to Producer cache related classes (#51)
---
.../AdaptedReactivePulsarClientFactory.java | 24 +++++++++++++++
.../client/adapter/ProducerCacheProvider.java | 18 +++++++++++
.../adapter/ProducerCacheProviderFactory.java | 5 ++++
.../adapter/AdapterImplementationFactory.java | 35 ++++++++++++++++++++++
.../ConcurrentHashMapProducerCacheProvider.java | 8 +++++
.../CaffeineProducerCacheProvider.java | 18 +++++++++++
.../CaffeineProducerCacheProviderFactory.java | 6 ++++
7 files changed, 114 insertions(+)
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/AdaptedReactivePulsarClientFactory.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/AdaptedReactivePulsarClientFactory.java
index fe730d1..761d888 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/AdaptedReactivePulsarClientFactory.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/AdaptedReactivePulsarClientFactory.java
@@ -16,13 +16,22 @@
package org.apache.pulsar.reactive.client.adapter;
+import java.util.ServiceLoader;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
import
org.apache.pulsar.reactive.client.internal.adapter.AdapterImplementationFactory;
+import
org.apache.pulsar.reactive.client.internal.adapter.ConcurrentHashMapProducerCacheProvider;
+/**
+ * Class to create {@link ReactivePulsarClient} and {@link
ReactiveMessageSenderCache}.
+ * instances.
+ *
+ * @author Lari Hotari
+ * @author Christophe Bornet
+ */
public final class AdaptedReactivePulsarClientFactory {
private AdaptedReactivePulsarClientFactory() {
@@ -49,10 +58,25 @@ public final class AdaptedReactivePulsarClientFactory {
return
AdapterImplementationFactory.createReactivePulsarClient(pulsarClientSupplier);
}
+ /**
+ * Creates a {@link ReactiveMessageSenderCache} adapting the provided
+ * {@link ProducerCacheProvider}.
+ * @param producerCacheProvider a ProducerCacheProvider instance
+ * @return a ReactiveMessageSenderCache instance
+ */
public static ReactiveMessageSenderCache
createCache(ProducerCacheProvider producerCacheProvider) {
return
AdapterImplementationFactory.createCache(producerCacheProvider);
}
+ /**
+ * Creates a {@link ReactiveMessageSenderCache}. If a
+ * {@link ProducerCacheProviderFactory} can be loaded by the {@link
ServiceLoader}, it
+ * is used to supply a {@link ProducerCacheProvider} from which the
+ * {@link ReactiveMessageSenderCache} is adapted. Otherwise, the
+ * {@link ReactiveMessageSenderCache} is adapted from a new
+ * {@link ConcurrentHashMapProducerCacheProvider} instance.
+ * @return a ReactiveMessageSenderCache instance
+ */
public static ReactiveMessageSenderCache createCache() {
return AdapterImplementationFactory.createCache();
}
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProvider.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProvider.java
index 782bd4e..c5f44cf 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProvider.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProvider.java
@@ -19,8 +19,26 @@ package org.apache.pulsar.reactive.client.adapter;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
+
+/**
+ * Cache provider interface used by the {@link ReactiveMessageSender} to cache
the
+ * underlying {@link Producer}s it uses.
+ *
+ * @author Lari Hotari
+ */
public interface ProducerCacheProvider extends AutoCloseable {
+ /**
+ * Gets or create an entry in the cache.
+ * @param key the cache key
+ * @param <K> the type of the cache key
+ * @param createEntryFunction a function called to create the entry in
the cache if
+ * it's not present.
+ * @param <V> the type of the cache entry value
+ * @return the cache entry value
+ */
<K, V> CompletableFuture<V> getOrCreateCachedEntry(K key, Function<K,
CompletableFuture<V>> createEntryFunction);
}
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProviderFactory.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProviderFactory.java
index f3e496a..b9c7c17 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProviderFactory.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProviderFactory.java
@@ -18,6 +18,11 @@ package org.apache.pulsar.reactive.client.adapter;
import java.util.function.Supplier;
+/**
+ * Interface to supply instances of {@link ProducerCacheProvider}.
+ *
+ * @author Lari Hotari
+ */
public interface ProducerCacheProviderFactory extends
Supplier<ProducerCacheProvider> {
}
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdapterImplementationFactory.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdapterImplementationFactory.java
index 5c65b43..9f43a9f 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdapterImplementationFactory.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdapterImplementationFactory.java
@@ -28,6 +28,12 @@ import
org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
import reactor.core.publisher.Mono;
+/**
+ * Adapter implementation for {@link ReactivePulsarClient} based on {@link
PulsarClient}.
+ *
+ * @author Lari Hotari
+ * @author Christophe Bornet
+ */
public final class AdapterImplementationFactory {
private AdapterImplementationFactory() {
@@ -47,18 +53,47 @@ public final class AdapterImplementationFactory {
}
}
+ /**
+ * Creates a ReactivePulsarClient which will lazily call the provided
supplier to get
+ * an instance of a Pulsar Client when needed.
+ * @param pulsarClientSupplier the supplier to use for getting a Pulsar
Client
+ * instance
+ * @return a ReactivePulsarClient instance
+ */
public static ReactivePulsarClient
createReactivePulsarClient(Supplier<PulsarClient> pulsarClientSupplier) {
return new ReactivePulsarResourceAdapterPulsarClient(new
ReactivePulsarResourceAdapter(pulsarClientSupplier));
}
+ /**
+ * Adapts a {@link CompletableFuture} returned by the {@link
PulsarClient} operations
+ * to a {@link Mono}.
+ * @param futureSupplier supplier of the {@link CompletableFuture}
+ * @param <T> the internal type
+ * @return the adapted {@link Mono}
+ */
public static <T> Mono<T> adaptPulsarFuture(Supplier<? extends
CompletableFuture<T>> futureSupplier) {
return PulsarFutureAdapter.adaptPulsarFuture(futureSupplier);
}
+ /**
+ * Creates a {@link ReactiveMessageSenderCache} adapting the provided
+ * {@link ProducerCacheProvider}.
+ * @param producerCacheProvider a ProducerCacheProvider instance
+ * @return a ReactiveMessageSenderCache instance
+ */
public static ReactiveMessageSenderCache
createCache(ProducerCacheProvider producerCacheProvider) {
return new ProducerCache(producerCacheProvider);
}
+ /**
+ * Creates a {@link ReactiveMessageSenderCache}. If a
+ * {@link ProducerCacheProviderFactory} can be loaded by the {@link
ServiceLoader}, it
+ * is used to supply a {@link ProducerCacheProvider} from which the
+ * {@link ReactiveMessageSenderCache} is adapted. Otherwise, the
+ * {@link ReactiveMessageSenderCache} is adapted from a new
+ * {@link ConcurrentHashMapProducerCacheProvider} instance.
+ * @return a ReactiveMessageSenderCache instance
+ */
public static ReactiveMessageSenderCache createCache() {
return new ProducerCache((PRODUCER_CACHE_PROVIDER_FACTORY !=
null) ? PRODUCER_CACHE_PROVIDER_FACTORY.get()
: new ConcurrentHashMapProducerCacheProvider());
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
index a4fb5e1..97a21b5 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
@@ -22,10 +22,18 @@ import java.util.function.Function;
import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
+/**
+ * Producer cache provider that uses a {@link ConcurrentHashMap} to cache
entries.
+ *
+ * @author Lari Hotari
+ */
public class ConcurrentHashMapProducerCacheProvider implements
ProducerCacheProvider {
private final ConcurrentHashMap<Object, CompletableFuture<Object>>
cache;
+ /**
+ * ConcurrentHashMapProducerCacheProvider's constructor.
+ */
public ConcurrentHashMapProducerCacheProvider() {
this.cache = new ConcurrentHashMap<>();
}
diff --git
a/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProvider.java
b/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProvider.java
index d7b3a8c..eaa6522 100644
---
a/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProvider.java
+++
b/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProvider.java
@@ -28,19 +28,37 @@ import com.github.benmanes.caffeine.cache.Scheduler;
import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
import reactor.core.scheduler.Schedulers;
+/**
+ * Producer cache provider that uses a Caffeine {@link AsyncCache} to cache
entries.
+ *
+ * @author Lari Hotari
+ */
public class CaffeineProducerCacheProvider implements ProducerCacheProvider {
final AsyncCache<Object, Object> cache;
+ /**
+ * Constructor for CaffeineProducerCacheProvider with default values.
+ */
public CaffeineProducerCacheProvider() {
this(Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(1)).expireAfterWrite(Duration.ofMinutes(10))
.maximumSize(1000));
}
+ /**
+ * Constructor for CaffeineProducerCacheProvider building the cache
from a
+ * {@link CaffeineSpec}.
+ * @param caffeineSpec the Caffeine spec
+ */
public CaffeineProducerCacheProvider(CaffeineSpec caffeineSpec) {
this(Caffeine.from(caffeineSpec));
}
+ /**
+ * Constructor for CaffeineProducerCacheProvider building the cache
from a Caffeine
+ * cache builder.
+ * @param caffeineBuilder the Caffeine cache builder
+ */
public CaffeineProducerCacheProvider(Caffeine<Object, Object>
caffeineBuilder) {
this.cache =
caffeineBuilder.scheduler(Scheduler.systemScheduler())
.executor(Schedulers.boundedElastic()::schedule).removalListener(this::onRemoval).buildAsync();
diff --git
a/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProviderFactory.java
b/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProviderFactory.java
index 733dac3..afd517c 100644
---
a/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProviderFactory.java
+++
b/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProviderFactory.java
@@ -19,6 +19,12 @@ package org.apache.pulsar.reactive.client.producercache;
import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
import org.apache.pulsar.reactive.client.adapter.ProducerCacheProviderFactory;
+/**
+ * {@link ProducerCacheProviderFactory} that creates instances of
+ * {@link CaffeineProducerCacheProvider}.
+ *
+ * @author Lari Hotari
+ */
public class CaffeineProducerCacheProviderFactory implements
ProducerCacheProviderFactory {
@Override