This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c8cf9f  Add javadoc to Producer cache related classes (#51)
8c8cf9f is described below

commit 8c8cf9fb633a2600ea5ef04cba41cd5c06e306b2
Author: Christophe Bornet <[email protected]>
AuthorDate: Tue Nov 29 10:53:14 2022 +0100

    Add javadoc to Producer cache related classes (#51)
---
 .../AdaptedReactivePulsarClientFactory.java        | 24 +++++++++++++++
 .../client/adapter/ProducerCacheProvider.java      | 18 +++++++++++
 .../adapter/ProducerCacheProviderFactory.java      |  5 ++++
 .../adapter/AdapterImplementationFactory.java      | 35 ++++++++++++++++++++++
 .../ConcurrentHashMapProducerCacheProvider.java    |  8 +++++
 .../CaffeineProducerCacheProvider.java             | 18 +++++++++++
 .../CaffeineProducerCacheProviderFactory.java      |  6 ++++
 7 files changed, 114 insertions(+)

diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/AdaptedReactivePulsarClientFactory.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/AdaptedReactivePulsarClientFactory.java
index fe730d1..761d888 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/AdaptedReactivePulsarClientFactory.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/AdaptedReactivePulsarClientFactory.java
@@ -16,13 +16,22 @@
 
 package org.apache.pulsar.reactive.client.adapter;
 
+import java.util.ServiceLoader;
 import java.util.function.Supplier;
 
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
 import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
 import 
org.apache.pulsar.reactive.client.internal.adapter.AdapterImplementationFactory;
+import 
org.apache.pulsar.reactive.client.internal.adapter.ConcurrentHashMapProducerCacheProvider;
 
+/**
+ * Class to create {@link ReactivePulsarClient} and {@link 
ReactiveMessageSenderCache}.
+ * instances.
+ *
+ * @author Lari Hotari
+ * @author Christophe Bornet
+ */
 public final class AdaptedReactivePulsarClientFactory {
 
        private AdaptedReactivePulsarClientFactory() {
@@ -49,10 +58,25 @@ public final class AdaptedReactivePulsarClientFactory {
                return 
AdapterImplementationFactory.createReactivePulsarClient(pulsarClientSupplier);
        }
 
+       /**
+        * Creates a {@link ReactiveMessageSenderCache} adapting the provided
+        * {@link ProducerCacheProvider}.
+        * @param producerCacheProvider a ProducerCacheProvider instance
+        * @return a ReactiveMessageSenderCache instance
+        */
        public static ReactiveMessageSenderCache 
createCache(ProducerCacheProvider producerCacheProvider) {
                return 
AdapterImplementationFactory.createCache(producerCacheProvider);
        }
 
+       /**
+        * Creates a {@link ReactiveMessageSenderCache}. If a
+        * {@link ProducerCacheProviderFactory} can be loaded by the {@link 
ServiceLoader}, it
+        * is used to supply a {@link ProducerCacheProvider} from which the
+        * {@link ReactiveMessageSenderCache} is adapted. Otherwise, the
+        * {@link ReactiveMessageSenderCache} is adapted from a new
+        * {@link ConcurrentHashMapProducerCacheProvider} instance.
+        * @return a ReactiveMessageSenderCache instance
+        */
        public static ReactiveMessageSenderCache createCache() {
                return AdapterImplementationFactory.createCache();
        }
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProvider.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProvider.java
index 782bd4e..c5f44cf 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProvider.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProvider.java
@@ -19,8 +19,26 @@ package org.apache.pulsar.reactive.client.adapter;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
+
+/**
+ * Cache provider interface used by the {@link ReactiveMessageSender} to cache 
the
+ * underlying {@link Producer}s it uses.
+ *
+ * @author Lari Hotari
+ */
 public interface ProducerCacheProvider extends AutoCloseable {
 
+       /**
+        * Gets or create an entry in the cache.
+        * @param key the cache key
+        * @param <K> the type of the cache key
+        * @param createEntryFunction a function called to create the entry in 
the cache if
+        * it's not present.
+        * @param <V> the type of the cache entry value
+        * @return the cache entry value
+        */
        <K, V> CompletableFuture<V> getOrCreateCachedEntry(K key, Function<K, 
CompletableFuture<V>> createEntryFunction);
 
 }
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProviderFactory.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProviderFactory.java
index f3e496a..b9c7c17 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProviderFactory.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/adapter/ProducerCacheProviderFactory.java
@@ -18,6 +18,11 @@ package org.apache.pulsar.reactive.client.adapter;
 
 import java.util.function.Supplier;
 
+/**
+ * Interface to supply instances of {@link ProducerCacheProvider}.
+ *
+ * @author Lari Hotari
+ */
 public interface ProducerCacheProviderFactory extends 
Supplier<ProducerCacheProvider> {
 
 }
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdapterImplementationFactory.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdapterImplementationFactory.java
index 5c65b43..9f43a9f 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdapterImplementationFactory.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdapterImplementationFactory.java
@@ -28,6 +28,12 @@ import 
org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
 import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
 import reactor.core.publisher.Mono;
 
+/**
+ * Adapter implementation for {@link ReactivePulsarClient} based on {@link 
PulsarClient}.
+ *
+ * @author Lari Hotari
+ * @author Christophe Bornet
+ */
 public final class AdapterImplementationFactory {
 
        private AdapterImplementationFactory() {
@@ -47,18 +53,47 @@ public final class AdapterImplementationFactory {
                }
        }
 
+       /**
+        * Creates a ReactivePulsarClient which will lazily call the provided 
supplier to get
+        * an instance of a Pulsar Client when needed.
+        * @param pulsarClientSupplier the supplier to use for getting a Pulsar 
Client
+        * instance
+        * @return a ReactivePulsarClient instance
+        */
        public static ReactivePulsarClient 
createReactivePulsarClient(Supplier<PulsarClient> pulsarClientSupplier) {
                return new ReactivePulsarResourceAdapterPulsarClient(new 
ReactivePulsarResourceAdapter(pulsarClientSupplier));
        }
 
+       /**
+        * Adapts a {@link CompletableFuture} returned by the {@link 
PulsarClient} operations
+        * to a {@link Mono}.
+        * @param futureSupplier supplier of the {@link CompletableFuture}
+        * @param <T> the internal type
+        * @return the adapted {@link Mono}
+        */
        public static <T> Mono<T> adaptPulsarFuture(Supplier<? extends 
CompletableFuture<T>> futureSupplier) {
                return PulsarFutureAdapter.adaptPulsarFuture(futureSupplier);
        }
 
+       /**
+        * Creates a {@link ReactiveMessageSenderCache} adapting the provided
+        * {@link ProducerCacheProvider}.
+        * @param producerCacheProvider a ProducerCacheProvider instance
+        * @return a ReactiveMessageSenderCache instance
+        */
        public static ReactiveMessageSenderCache 
createCache(ProducerCacheProvider producerCacheProvider) {
                return new ProducerCache(producerCacheProvider);
        }
 
+       /**
+        * Creates a {@link ReactiveMessageSenderCache}. If a
+        * {@link ProducerCacheProviderFactory} can be loaded by the {@link 
ServiceLoader}, it
+        * is used to supply a {@link ProducerCacheProvider} from which the
+        * {@link ReactiveMessageSenderCache} is adapted. Otherwise, the
+        * {@link ReactiveMessageSenderCache} is adapted from a new
+        * {@link ConcurrentHashMapProducerCacheProvider} instance.
+        * @return a ReactiveMessageSenderCache instance
+        */
        public static ReactiveMessageSenderCache createCache() {
                return new ProducerCache((PRODUCER_CACHE_PROVIDER_FACTORY != 
null) ? PRODUCER_CACHE_PROVIDER_FACTORY.get()
                                : new ConcurrentHashMapProducerCacheProvider());
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
index a4fb5e1..97a21b5 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
@@ -22,10 +22,18 @@ import java.util.function.Function;
 
 import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
 
+/**
+ * Producer cache provider that uses a {@link ConcurrentHashMap} to cache 
entries.
+ *
+ * @author Lari Hotari
+ */
 public class ConcurrentHashMapProducerCacheProvider implements 
ProducerCacheProvider {
 
        private final ConcurrentHashMap<Object, CompletableFuture<Object>> 
cache;
 
+       /**
+        * ConcurrentHashMapProducerCacheProvider's constructor.
+        */
        public ConcurrentHashMapProducerCacheProvider() {
                this.cache = new ConcurrentHashMap<>();
        }
diff --git 
a/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProvider.java
 
b/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProvider.java
index d7b3a8c..eaa6522 100644
--- 
a/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProvider.java
+++ 
b/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProvider.java
@@ -28,19 +28,37 @@ import com.github.benmanes.caffeine.cache.Scheduler;
 import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
 import reactor.core.scheduler.Schedulers;
 
+/**
+ * Producer cache provider that uses a Caffeine {@link AsyncCache} to cache 
entries.
+ *
+ * @author Lari Hotari
+ */
 public class CaffeineProducerCacheProvider implements ProducerCacheProvider {
 
        final AsyncCache<Object, Object> cache;
 
+       /**
+        * Constructor for CaffeineProducerCacheProvider with default values.
+        */
        public CaffeineProducerCacheProvider() {
                
this(Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(1)).expireAfterWrite(Duration.ofMinutes(10))
                                .maximumSize(1000));
        }
 
+       /**
+        * Constructor for CaffeineProducerCacheProvider building the cache 
from a
+        * {@link CaffeineSpec}.
+        * @param caffeineSpec the Caffeine spec
+        */
        public CaffeineProducerCacheProvider(CaffeineSpec caffeineSpec) {
                this(Caffeine.from(caffeineSpec));
        }
 
+       /**
+        * Constructor for CaffeineProducerCacheProvider building the cache 
from a Caffeine
+        * cache builder.
+        * @param caffeineBuilder the Caffeine cache builder
+        */
        public CaffeineProducerCacheProvider(Caffeine<Object, Object> 
caffeineBuilder) {
                this.cache = 
caffeineBuilder.scheduler(Scheduler.systemScheduler())
                                
.executor(Schedulers.boundedElastic()::schedule).removalListener(this::onRemoval).buildAsync();
diff --git 
a/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProviderFactory.java
 
b/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProviderFactory.java
index 733dac3..afd517c 100644
--- 
a/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProviderFactory.java
+++ 
b/pulsar-client-reactive-producer-cache-caffeine/src/main/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProviderFactory.java
@@ -19,6 +19,12 @@ package org.apache.pulsar.reactive.client.producercache;
 import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
 import org.apache.pulsar.reactive.client.adapter.ProducerCacheProviderFactory;
 
+/**
+ * {@link ProducerCacheProviderFactory} that creates instances of
+ * {@link CaffeineProducerCacheProvider}.
+ *
+ * @author Lari Hotari
+ */
 public class CaffeineProducerCacheProviderFactory implements 
ProducerCacheProviderFactory {
 
        @Override

Reply via email to