Repository: curator Updated Branches: refs/heads/CURATOR-2.0 fc4b905f6 -> cd37f64f2
Merge of CURATOR-295 Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/cd37f64f Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/cd37f64f Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/cd37f64f Branch: refs/heads/CURATOR-2.0 Commit: cd37f64f2372441a4f7003314b65ba890695ba24 Parents: fc4b905 Author: randgalt <[email protected]> Authored: Tue Jul 11 10:04:46 2017 -0500 Committer: randgalt <[email protected]> Committed: Tue Jul 11 10:04:46 2017 -0500 ---------------------------------------------------------------------- .../curator/utils/ExceptionAccumulator.java | 51 ++++++++++++++++++++ .../discovery/details/ServiceDiscoveryImpl.java | 21 +++++--- 2 files changed, 64 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/cd37f64f/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java b/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java new file mode 100644 index 0000000..2be2ee8 --- /dev/null +++ b/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java @@ -0,0 +1,51 @@ +package org.apache.curator.utils; + +import com.google.common.base.Throwables; + +/** + * Utility to accumulate multiple potential exceptions into one that + * is thrown at the end + */ +public class ExceptionAccumulator +{ + private volatile Throwable mainEx = null; + + /** + * If there is an accumulated exception, throw it + */ + public void propagate() + { + if ( mainEx != null ) + { + Throwables.propagate(mainEx); + } + } + + /** + * Add an exception into the accumulated exceptions. Note: + * if the exception is {@link java.lang.InterruptedException} + * then <code>Thread.currentThread().interrupt()</code> is called. + * + * @param e the exception + */ + public void add(Throwable e) + { + if ( e instanceof InterruptedException ) + { + if ( mainEx != null ) + { + e.addSuppressed(mainEx); + } + Thread.currentThread().interrupt(); + } + + if ( mainEx == null ) + { + mainEx = e; + } + else + { + mainEx.addSuppressed(e); + } + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/cd37f64f/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java index 762c9a8..476705c 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java @@ -31,6 +31,7 @@ import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.utils.ExceptionAccumulator; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.discovery.ServiceCache; @@ -39,7 +40,6 @@ import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceProvider; import org.apache.curator.x.discovery.ServiceProviderBuilder; -import org.apache.curator.x.discovery.ServiceType; import org.apache.curator.x.discovery.strategies.RoundRobinStrategy; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -77,9 +77,12 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> log.debug("Re-registering due to reconnection"); reRegisterServices(); } + catch (InterruptedException ex) + { + Thread.currentThread().interrupt(); + } catch ( Exception e ) { - ThreadUtils.checkInterrupted(e); log.error("Could not re-register instances after reconnection", e); } } @@ -140,10 +143,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> @Override public void close() throws IOException { - for ( ServiceCache<T> cache : Lists.newArrayList(caches) ) - { - CloseableUtils.closeQuietly(cache); - } + ExceptionAccumulator accumulator = new ExceptionAccumulator(); for ( ServiceProvider<T> provider : Lists.newArrayList(providers) ) { CloseableUtils.closeQuietly(provider); @@ -161,12 +161,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> } catch ( Exception e ) { - ThreadUtils.checkInterrupted(e); + accumulator.add(e); log.error("Could not unregister instance: " + entry.service.getName(), e); } } client.getConnectionStateListenable().removeListener(connectionStateListener); + accumulator.propagate(); } /** @@ -469,9 +470,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> { nodeCache.start(true); } + catch ( InterruptedException e) + { + Thread.currentThread().interrupt(); + return null; + } catch ( Exception e ) { - ThreadUtils.checkInterrupted(e); log.error("Could not start node cache for: " + instance, e); } NodeCacheListener listener = new NodeCacheListener()
