Repository: incubator-gobblin Updated Branches: refs/heads/master f7ea77eb9 -> a02073e9d
[GOBBLIN-512] Add containers to hold unchecked exceptions from closing resources in broker Closes #2383 from autumnust/brokerguaranteecloseeverything Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a02073e9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a02073e9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a02073e9 Branch: refs/heads/master Commit: a02073e9d5cc91f3f759483c3dad8e6ddf011a65 Parents: f7ea77e Author: Lei Sun <[email protected]> Authored: Thu Jun 14 16:00:50 2018 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Thu Jun 14 16:00:50 2018 -0700 ---------------------------------------------------------------------- .../gobblin/broker/DefaultBrokerCache.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a02073e9/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java index 0c001f1..f034114 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java @@ -19,15 +19,19 @@ package org.apache.gobblin.broker; import java.io.Closeable; import java.io.IOException; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; +import java.util.stream.Collectors; import com.google.common.base.Predicate; +import com.google.common.base.Throwables; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Lists; @@ -202,6 +206,7 @@ class DefaultBrokerCache<S extends ScopeType<S>> { */ public void close(ScopeWrapper<S> scope) throws IOException { + List<Throwable> exceptionsList = Lists.newArrayList(); List<Service> awaitShutdown = Lists.newArrayList(); for (Map.Entry<RawJobBrokerKey, Object> entry : Maps.filterKeys(this.sharedResourceCache.asMap(), @@ -211,8 +216,12 @@ class DefaultBrokerCache<S extends ScopeType<S>> { if (entry.getValue() instanceof ResourceInstance) { Object obj = ((ResourceInstance) entry.getValue()).getResource(); - - SharedResourcesBrokerUtils.shutdownObject(obj, log); + // Catch unchecked exception while closing resources, make sure all resources managed by cache are closed. + try { + SharedResourcesBrokerUtils.shutdownObject(obj, log); + } catch (Throwable t) { + exceptionsList.add(t); + } if (obj instanceof Service) { awaitShutdown.add((Service) obj); } @@ -226,6 +235,12 @@ class DefaultBrokerCache<S extends ScopeType<S>> { log.error("Failed to shutdown {}.", service); } } + + // log exceptions while closing resources up. + if (exceptionsList.size() > 0) { + log.error(exceptionsList.stream() + .map(Throwables::getStackTraceAsString).collect(Collectors.joining("\n"))); + } } /**
