Repository: incubator-gobblin Updated Branches: refs/heads/master f51bf00b4 -> d323f6022
[GOBBLIN-435] Fix data publisher created from job broker not closed Closes #2312 from zxcware/audit Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d323f602 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d323f602 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d323f602 Branch: refs/heads/master Commit: d323f60222534ddff42beaf08716040246a999c3 Parents: f51bf00 Author: zhchen <[email protected]> Authored: Thu Mar 22 14:14:53 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Mar 22 14:14:53 2018 -0700 ---------------------------------------------------------------------- .../gobblin/publisher/DataPublisherFactory.java | 3 +- .../publisher/DataPublisherFactoryTest.java | 39 +++++++++++++++----- .../broker/SharedResourcesBrokerImpl.java | 4 ++ 3 files changed, 35 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d323f602/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java index 8d77fd6..ea228a7 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java @@ -78,6 +78,7 @@ public class DataPublisherFactory<S extends ScopeType<S>> State state = key.getState(); Class<? extends DataPublisher> dataPublisherClass = (Class<? extends DataPublisher>) Class .forName(publisherClassName); + log.info("Creating data publisher with class {} in scope {}. ", publisherClassName, config.getScope().toString()); DataPublisher publisher = DataPublisher.getInstance(dataPublisherClass, state); @@ -97,6 +98,6 @@ public class DataPublisherFactory<S extends ScopeType<S>> @Override public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, DataPublisherKey> config) { - return broker.selfScope().getType().rootScope(); + return broker.selfScope().getType(); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d323f602/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java index 6f58a50..5c65fc4 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java @@ -43,6 +43,9 @@ import org.apache.gobblin.capability.Capability; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; +import lombok.Getter; + + /** * Tests for DataPublisherFactory */ @@ -76,29 +79,41 @@ public class DataPublisherFactoryTest { SharedResourcesBroker<SimpleScopeType> localBroker1 = broker.newSubscopedBuilder(new SimpleScope<>(SimpleScopeType.LOCAL, "local1")).build(); - DataPublisher publisher1 = DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(), null, broker); - DataPublisher publisher2 = DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(), null, broker); + TestThreadsafeDataPublisher publisher1 = (TestThreadsafeDataPublisher)DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(), null, broker); + TestThreadsafeDataPublisher publisher2 = (TestThreadsafeDataPublisher)DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(), null, broker); // should get the same publisher Assert.assertEquals(publisher1, publisher2); - DataPublisher publisher3 = - localBroker1.getSharedResource(new DataPublisherFactory<>(), + TestThreadsafeDataPublisher publisher3 = + (TestThreadsafeDataPublisher)localBroker1.getSharedResource(new DataPublisherFactory<>(), new DataPublisherKey(TestThreadsafeDataPublisher.class.getName(), null)); - // should get the same publisher - Assert.assertEquals(publisher2, publisher3); + // should not get the same publisher + Assert.assertNotEquals(publisher2, publisher3); - DataPublisher publisher4 = - localBroker1.getSharedResourceAtScope(new DataPublisherFactory<>(), + TestThreadsafeDataPublisher publisher4 = + (TestThreadsafeDataPublisher)localBroker1.getSharedResourceAtScope(new DataPublisherFactory<>(), new DataPublisherKey(TestThreadsafeDataPublisher.class.getName(), null), SimpleScopeType.LOCAL); - // should get a different publisher - Assert.assertNotEquals(publisher3, publisher4); + // should get the same publisher + Assert.assertEquals(publisher3, publisher4); // Check capabilities Assert.assertTrue(publisher1.supportsCapability(DataPublisher.REUSABLE, Collections.EMPTY_MAP)); Assert.assertTrue(publisher1.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP)); + + // Check data publisher is not closed + Assert.assertFalse(publisher1.isClosed()); + Assert.assertFalse(publisher2.isClosed()); + Assert.assertFalse(publisher3.isClosed()); + Assert.assertFalse(publisher4.isClosed()); + broker.close(); + // Check all publishers are closed + Assert.assertTrue(publisher1.isClosed()); + Assert.assertTrue(publisher2.isClosed()); + Assert.assertTrue(publisher3.isClosed()); + Assert.assertTrue(publisher4.isClosed()); } @Test() @@ -143,6 +158,9 @@ public class DataPublisherFactoryTest { private static class TestNonThreadsafeDataPublisher extends DataPublisher { + @Getter + private boolean isClosed = false; + public TestNonThreadsafeDataPublisher(State state) { super(state); } @@ -161,6 +179,7 @@ public class DataPublisherFactoryTest { @Override public void close() throws IOException { + isClosed = true; } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d323f602/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java index c5031fe..ff066f4 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java @@ -44,6 +44,7 @@ import org.apache.gobblin.util.ConfigUtils; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import lombok.Data; +import lombok.extern.slf4j.Slf4j; /** @@ -55,6 +56,7 @@ import lombok.Data; * SharedResourcesBrokerImpl<MyScopes> scopeBroker = topBroker.newSubscopedBuilder(scope, "scopeId").build(); * </pre> */ +@Slf4j public class SharedResourcesBrokerImpl<S extends ScopeType<S>> implements SharedResourcesBroker<S> { private final DefaultBrokerCache<S> brokerCache; @@ -323,6 +325,8 @@ public class SharedResourcesBrokerImpl<S extends ScopeType<S>> implements Shared @Override public void close() throws IOException { + ScopeInstance<S> scope = this.selfScopeWrapper.getScope(); + log.info("Closing broker with scope {} of id {}.", scope.getType().toString(), scope.getScopeId()); this.brokerCache.close(this.selfScopeWrapper); } }
