This is an automated email from the ASF dual-hosted git repository.

shayshim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new a470379  - ServiceCacheImpl should allow the self-created executor 
service to be closed - Allow an optional executor service for ServiceProvider
     new 34fdb43  Merge pull request #343 from naude-r/servicecache_thread_leak
a470379 is described below

commit a470379472589010325918401ddb17913c78c57e
Author: Roelof Naude <[email protected]>
AuthorDate: Wed Jan 22 15:04:37 2020 +0200

    - ServiceCacheImpl should allow the self-created executor service to be 
closed
    - Allow an optional executor service for ServiceProvider
---
 .../curator/x/discovery/ServiceCacheBuilder.java   |  2 ++
 .../x/discovery/ServiceProviderBuilder.java        | 26 +++++++++++++++++++++-
 .../discovery/details/ServiceCacheBuilderImpl.java |  5 ++---
 .../x/discovery/details/ServiceCacheImpl.java      |  2 +-
 .../details/ServiceProviderBuilderImpl.java        | 21 ++++++++++++++++-
 .../x/discovery/details/ServiceProviderImpl.java   | 25 +++++++++++++++++++--
 6 files changed, 73 insertions(+), 8 deletions(-)

diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
index 290d9b1..326a16c 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
@@ -45,7 +45,9 @@ public interface ServiceCacheBuilder<T>
      *
      * @param threadFactory factory
      * @return this
+     * @deprecated use {@link #executorService(ExecutorService)} instead
      */
+    @Deprecated
     public ServiceCacheBuilder<T> threadFactory(ThreadFactory threadFactory);
 
     /**
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
index 2be311f..02948a3 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
@@ -18,7 +18,9 @@
  */
 package org.apache.curator.x.discovery;
 
+import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 
 public interface ServiceProviderBuilder<T>
@@ -47,11 +49,14 @@ public interface ServiceProviderBuilder<T>
     public ServiceProviderBuilder<T> providerStrategy(ProviderStrategy<T> 
providerStrategy);
 
     /**
-     * optional - the thread factory to use for creating internal threads
+     * optional - the thread factory to use for creating internal threads. The 
specified ThreadFactory overrides
+     * any prior ThreadFactory or ClosableExecutorService set on the 
ServiceProviderBuilder
      *
      * @param threadFactory factory to use
      * @return this
+     * @deprecated use {@link #executorService(ExecutorService)} instead
      */
+    @Deprecated
     public ServiceProviderBuilder<T> threadFactory(ThreadFactory 
threadFactory);
 
     /**
@@ -71,4 +76,23 @@ public interface ServiceProviderBuilder<T>
      * @return this
      */
     public ServiceProviderBuilder<T> additionalFilter(InstanceFilter<T> 
filter);
+
+    /**
+     * Optional ExecutorService to use for the cache's background thread. The 
specified ExecutorService
+     * will be wrapped in a CloseableExecutorService and overrides any prior 
ThreadFactory or CloseableExecutorService
+     * set on the ServiceProviderBuilder.
+     *
+     * @param executorService executor service
+     * @return this
+     */
+    public ServiceProviderBuilder<T> executorService(ExecutorService 
executorService);
+
+    /**
+     * Optional CloseableExecutorService to use for the cache's background 
thread. The specified CloseableExecutorService
+     * overrides any prior ThreadFactory or CloseableExecutorService set on 
the ServiceProviderBuilder.
+     *
+     * @param executorService an instance of CloseableExecutorService
+     * @return this
+     */
+    public ServiceProviderBuilder<T> executorService(CloseableExecutorService 
executorService);
 }
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
index 8922233..7647c0f 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
@@ -77,6 +77,7 @@ class ServiceCacheBuilderImpl<T> implements 
ServiceCacheBuilder<T>
      * @return this
      */
     @Override
+    @Deprecated
     public ServiceCacheBuilder<T> threadFactory(ThreadFactory threadFactory)
     {
         this.threadFactory = threadFactory;
@@ -92,9 +93,7 @@ class ServiceCacheBuilderImpl<T> implements 
ServiceCacheBuilder<T>
      */
     @Override
     public ServiceCacheBuilder<T> executorService(ExecutorService 
executorService) {
-        this.executorService = new CloseableExecutorService(executorService);
-        this.threadFactory = null;
-        return this;
+        return executorService(new CloseableExecutorService(executorService, 
false));
     }
 
     /**
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index d1a31ad..05df301 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -61,7 +61,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, 
PathChildrenCacheLi
     private static CloseableExecutorService convertThreadFactory(ThreadFactory 
threadFactory)
     {
         Preconditions.checkNotNull(threadFactory, "threadFactory cannot be 
null");
-        return new 
CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory));
+        return new 
CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), 
true);
     }
 
     ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, 
ThreadFactory threadFactory)
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
index f094c59..e36700b 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
@@ -19,6 +19,7 @@
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.collect.Lists;
+import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.x.discovery.DownInstancePolicy;
 import org.apache.curator.x.discovery.InstanceFilter;
 import org.apache.curator.x.discovery.ProviderStrategy;
@@ -26,6 +27,7 @@ import org.apache.curator.x.discovery.ServiceProvider;
 import org.apache.curator.x.discovery.ServiceProviderBuilder;
 import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 
 /**
@@ -37,12 +39,13 @@ class ServiceProviderBuilderImpl<T> implements 
ServiceProviderBuilder<T>
     private String serviceName;
     private ProviderStrategy<T> providerStrategy;
     private ThreadFactory threadFactory;
+    private CloseableExecutorService executorService;
     private List<InstanceFilter<T>> filters = Lists.newArrayList();
     private DownInstancePolicy downInstancePolicy = new DownInstancePolicy();
 
     public ServiceProvider<T> build()
     {
-        return new ServiceProviderImpl<T>(discovery, serviceName, 
providerStrategy, threadFactory, filters, downInstancePolicy);
+        return new ServiceProviderImpl<T>(discovery, serviceName, 
providerStrategy, threadFactory, executorService, filters, downInstancePolicy);
     }
 
     ServiceProviderBuilderImpl(ServiceDiscoveryImpl<T> discovery)
@@ -83,9 +86,11 @@ class ServiceProviderBuilderImpl<T> implements 
ServiceProviderBuilder<T>
      * @return this
      */
     @Override
+    @Deprecated
     public ServiceProviderBuilder<T> threadFactory(ThreadFactory threadFactory)
     {
         this.threadFactory = threadFactory;
+        this.executorService = null;
         return this;
     }
 
@@ -102,4 +107,18 @@ class ServiceProviderBuilderImpl<T> implements 
ServiceProviderBuilder<T>
         filters.add(filter);
         return this;
     }
+
+    @Override
+    public ServiceProviderBuilder<T> executorService(ExecutorService 
executorService)
+    {
+        return executorService(new CloseableExecutorService(executorService));
+    }
+
+    @Override
+    public ServiceProviderBuilder<T> executorService(CloseableExecutorService 
executorService)
+    {
+        this.executorService = executorService;
+        this.threadFactory = null;
+        return this;
+    }
 }
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
index 2ab1434..d9787e4 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
@@ -19,21 +19,24 @@
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.collect.Lists;
+import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.x.discovery.DownInstancePolicy;
 import org.apache.curator.x.discovery.InstanceFilter;
 import org.apache.curator.x.discovery.ProviderStrategy;
 import org.apache.curator.x.discovery.ServiceCache;
+import org.apache.curator.x.discovery.ServiceCacheBuilder;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.apache.curator.x.discovery.ServiceProvider;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 
 /**
  * The main interface for Service Discovery. Encapsulates the discovery 
service for a particular
- * named service along with a provider strategy. 
+ * named service along with a provider strategy.
  */
 public class ServiceProviderImpl<T> implements ServiceProvider<T>
 {
@@ -45,11 +48,29 @@ public class ServiceProviderImpl<T> implements 
ServiceProvider<T>
 
     public ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String 
serviceName, ProviderStrategy<T> providerStrategy, ThreadFactory threadFactory, 
List<InstanceFilter<T>> filters, DownInstancePolicy downInstancePolicy)
     {
+        this(discovery, serviceName, providerStrategy, threadFactory, null, 
filters, downInstancePolicy);
+    }
+
+    public ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String 
serviceName, ProviderStrategy<T> providerStrategy, CloseableExecutorService 
executorService, List<InstanceFilter<T>> filters, DownInstancePolicy 
downInstancePolicy)
+    {
+        this(discovery, serviceName, providerStrategy, null, executorService, 
filters, downInstancePolicy);
+    }
+
+    protected ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String 
serviceName, ProviderStrategy<T> providerStrategy, ThreadFactory threadFactory, 
CloseableExecutorService executorService, List<InstanceFilter<T>> filters, 
DownInstancePolicy downInstancePolicy)
+    {
         this.discovery = discovery;
         this.providerStrategy = providerStrategy;
 
         downInstanceManager = new DownInstanceManager<T>(downInstancePolicy);
-        cache = 
discovery.serviceCacheBuilder().name(serviceName).threadFactory(threadFactory).build();
+        final ServiceCacheBuilder builder = 
discovery.serviceCacheBuilder().name(serviceName);
+        if (executorService != null)
+        {
+            builder.executorService(executorService);
+        } else
+        {
+            builder.threadFactory(threadFactory);
+        }
+        cache = builder.build();
 
         ArrayList<InstanceFilter<T>> localFilters = 
Lists.newArrayList(filters);
         localFilters.add(downInstanceManager);

Reply via email to