This is an automated email from the ASF dual-hosted git repository.
shayshim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new a470379 - ServiceCacheImpl should allow the self-created executor
service to be closed - Allow an optional executor service for ServiceProvider
new 34fdb43 Merge pull request #343 from naude-r/servicecache_thread_leak
a470379 is described below
commit a470379472589010325918401ddb17913c78c57e
Author: Roelof Naude <[email protected]>
AuthorDate: Wed Jan 22 15:04:37 2020 +0200
- ServiceCacheImpl should allow the self-created executor service to be
closed
- Allow an optional executor service for ServiceProvider
---
.../curator/x/discovery/ServiceCacheBuilder.java | 2 ++
.../x/discovery/ServiceProviderBuilder.java | 26 +++++++++++++++++++++-
.../discovery/details/ServiceCacheBuilderImpl.java | 5 ++---
.../x/discovery/details/ServiceCacheImpl.java | 2 +-
.../details/ServiceProviderBuilderImpl.java | 21 ++++++++++++++++-
.../x/discovery/details/ServiceProviderImpl.java | 25 +++++++++++++++++++--
6 files changed, 73 insertions(+), 8 deletions(-)
diff --git
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
index 290d9b1..326a16c 100644
---
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
+++
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
@@ -45,7 +45,9 @@ public interface ServiceCacheBuilder<T>
*
* @param threadFactory factory
* @return this
+ * @deprecated use {@link #executorService(ExecutorService)} instead
*/
+ @Deprecated
public ServiceCacheBuilder<T> threadFactory(ThreadFactory threadFactory);
/**
diff --git
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
index 2be311f..02948a3 100644
---
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
+++
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
@@ -18,7 +18,9 @@
*/
package org.apache.curator.x.discovery;
+import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
public interface ServiceProviderBuilder<T>
@@ -47,11 +49,14 @@ public interface ServiceProviderBuilder<T>
public ServiceProviderBuilder<T> providerStrategy(ProviderStrategy<T>
providerStrategy);
/**
- * optional - the thread factory to use for creating internal threads
+ * optional - the thread factory to use for creating internal threads. The
specified ThreadFactory overrides
+ * any prior ThreadFactory or ClosableExecutorService set on the
ServiceProviderBuilder
*
* @param threadFactory factory to use
* @return this
+ * @deprecated use {@link #executorService(ExecutorService)} instead
*/
+ @Deprecated
public ServiceProviderBuilder<T> threadFactory(ThreadFactory
threadFactory);
/**
@@ -71,4 +76,23 @@ public interface ServiceProviderBuilder<T>
* @return this
*/
public ServiceProviderBuilder<T> additionalFilter(InstanceFilter<T>
filter);
+
+ /**
+ * Optional ExecutorService to use for the cache's background thread. The
specified ExecutorService
+ * will be wrapped in a CloseableExecutorService and overrides any prior
ThreadFactory or CloseableExecutorService
+ * set on the ServiceProviderBuilder.
+ *
+ * @param executorService executor service
+ * @return this
+ */
+ public ServiceProviderBuilder<T> executorService(ExecutorService
executorService);
+
+ /**
+ * Optional CloseableExecutorService to use for the cache's background
thread. The specified CloseableExecutorService
+ * overrides any prior ThreadFactory or CloseableExecutorService set on
the ServiceProviderBuilder.
+ *
+ * @param executorService an instance of CloseableExecutorService
+ * @return this
+ */
+ public ServiceProviderBuilder<T> executorService(CloseableExecutorService
executorService);
}
diff --git
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
index 8922233..7647c0f 100644
---
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
+++
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
@@ -77,6 +77,7 @@ class ServiceCacheBuilderImpl<T> implements
ServiceCacheBuilder<T>
* @return this
*/
@Override
+ @Deprecated
public ServiceCacheBuilder<T> threadFactory(ThreadFactory threadFactory)
{
this.threadFactory = threadFactory;
@@ -92,9 +93,7 @@ class ServiceCacheBuilderImpl<T> implements
ServiceCacheBuilder<T>
*/
@Override
public ServiceCacheBuilder<T> executorService(ExecutorService
executorService) {
- this.executorService = new CloseableExecutorService(executorService);
- this.threadFactory = null;
- return this;
+ return executorService(new CloseableExecutorService(executorService,
false));
}
/**
diff --git
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index d1a31ad..05df301 100644
---
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -61,7 +61,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>,
PathChildrenCacheLi
private static CloseableExecutorService convertThreadFactory(ThreadFactory
threadFactory)
{
Preconditions.checkNotNull(threadFactory, "threadFactory cannot be
null");
- return new
CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory));
+ return new
CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory),
true);
}
ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name,
ThreadFactory threadFactory)
diff --git
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
index f094c59..e36700b 100644
---
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
+++
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
@@ -19,6 +19,7 @@
package org.apache.curator.x.discovery.details;
import com.google.common.collect.Lists;
+import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.x.discovery.DownInstancePolicy;
import org.apache.curator.x.discovery.InstanceFilter;
import org.apache.curator.x.discovery.ProviderStrategy;
@@ -26,6 +27,7 @@ import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.curator.x.discovery.ServiceProviderBuilder;
import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
/**
@@ -37,12 +39,13 @@ class ServiceProviderBuilderImpl<T> implements
ServiceProviderBuilder<T>
private String serviceName;
private ProviderStrategy<T> providerStrategy;
private ThreadFactory threadFactory;
+ private CloseableExecutorService executorService;
private List<InstanceFilter<T>> filters = Lists.newArrayList();
private DownInstancePolicy downInstancePolicy = new DownInstancePolicy();
public ServiceProvider<T> build()
{
- return new ServiceProviderImpl<T>(discovery, serviceName,
providerStrategy, threadFactory, filters, downInstancePolicy);
+ return new ServiceProviderImpl<T>(discovery, serviceName,
providerStrategy, threadFactory, executorService, filters, downInstancePolicy);
}
ServiceProviderBuilderImpl(ServiceDiscoveryImpl<T> discovery)
@@ -83,9 +86,11 @@ class ServiceProviderBuilderImpl<T> implements
ServiceProviderBuilder<T>
* @return this
*/
@Override
+ @Deprecated
public ServiceProviderBuilder<T> threadFactory(ThreadFactory threadFactory)
{
this.threadFactory = threadFactory;
+ this.executorService = null;
return this;
}
@@ -102,4 +107,18 @@ class ServiceProviderBuilderImpl<T> implements
ServiceProviderBuilder<T>
filters.add(filter);
return this;
}
+
+ @Override
+ public ServiceProviderBuilder<T> executorService(ExecutorService
executorService)
+ {
+ return executorService(new CloseableExecutorService(executorService));
+ }
+
+ @Override
+ public ServiceProviderBuilder<T> executorService(CloseableExecutorService
executorService)
+ {
+ this.executorService = executorService;
+ this.threadFactory = null;
+ return this;
+ }
}
diff --git
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
index 2ab1434..d9787e4 100644
---
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
+++
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
@@ -19,21 +19,24 @@
package org.apache.curator.x.discovery.details;
import com.google.common.collect.Lists;
+import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.x.discovery.DownInstancePolicy;
import org.apache.curator.x.discovery.InstanceFilter;
import org.apache.curator.x.discovery.ProviderStrategy;
import org.apache.curator.x.discovery.ServiceCache;
+import org.apache.curator.x.discovery.ServiceCacheBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
/**
* The main interface for Service Discovery. Encapsulates the discovery
service for a particular
- * named service along with a provider strategy.
+ * named service along with a provider strategy.
*/
public class ServiceProviderImpl<T> implements ServiceProvider<T>
{
@@ -45,11 +48,29 @@ public class ServiceProviderImpl<T> implements
ServiceProvider<T>
public ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String
serviceName, ProviderStrategy<T> providerStrategy, ThreadFactory threadFactory,
List<InstanceFilter<T>> filters, DownInstancePolicy downInstancePolicy)
{
+ this(discovery, serviceName, providerStrategy, threadFactory, null,
filters, downInstancePolicy);
+ }
+
+ public ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String
serviceName, ProviderStrategy<T> providerStrategy, CloseableExecutorService
executorService, List<InstanceFilter<T>> filters, DownInstancePolicy
downInstancePolicy)
+ {
+ this(discovery, serviceName, providerStrategy, null, executorService,
filters, downInstancePolicy);
+ }
+
+ protected ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String
serviceName, ProviderStrategy<T> providerStrategy, ThreadFactory threadFactory,
CloseableExecutorService executorService, List<InstanceFilter<T>> filters,
DownInstancePolicy downInstancePolicy)
+ {
this.discovery = discovery;
this.providerStrategy = providerStrategy;
downInstanceManager = new DownInstanceManager<T>(downInstancePolicy);
- cache =
discovery.serviceCacheBuilder().name(serviceName).threadFactory(threadFactory).build();
+ final ServiceCacheBuilder builder =
discovery.serviceCacheBuilder().name(serviceName);
+ if (executorService != null)
+ {
+ builder.executorService(executorService);
+ } else
+ {
+ builder.threadFactory(threadFactory);
+ }
+ cache = builder.build();
ArrayList<InstanceFilter<T>> localFilters =
Lists.newArrayList(filters);
localFilters.add(downInstanceManager);