DL-106: Use namespace after it is closed will throw AlreadyClosedException
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/ab0868cc Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/ab0868cc Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/ab0868cc Branch: refs/heads/master Commit: ab0868cceae28730e103c0cf587497719bbc2758 Parents: a9cbb2c Author: Yiming Zang <yz...@twitter.com> Authored: Fri Aug 12 10:30:39 2016 -0700 Committer: Sijie Guo <sij...@twitter.com> Committed: Tue Dec 27 16:49:28 2016 -0800 ---------------------------------------------------------------------- .../BKDistributedLogNamespace.java | 43 ++++++++---- .../TestBKDistributedLogNamespace.java | 72 ++++++++++++++++++++ 2 files changed, 101 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ab0868cc/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java index 281c637..cae6f6a 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java @@ -32,6 +32,7 @@ import com.twitter.distributedlog.bk.LedgerAllocator; import com.twitter.distributedlog.bk.LedgerAllocatorUtils; import com.twitter.distributedlog.callback.NamespaceListener; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; +import com.twitter.distributedlog.exceptions.AlreadyClosedException; import com.twitter.distributedlog.exceptions.DLInterruptedException; import com.twitter.distributedlog.exceptions.InvalidStreamNameException; import com.twitter.distributedlog.exceptions.LogNotFoundException; @@ -86,6 +87,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static com.twitter.distributedlog.impl.BKDLUtils.*; @@ -317,7 +319,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { private final StatsLogger perLogStatsLogger; private final ReadAheadExceptionsLogger readAheadExceptionsLogger; - protected boolean closed = false; + protected AtomicBoolean closed = new AtomicBoolean(false); private final PermitLimiter writeLimiter; @@ -494,6 +496,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { @Override public void createLog(String logName) throws InvalidStreamNameException, IOException { + checkState(); validateName(logName); URI uri = FutureUtils.result(metadataStore.createLog(logName)); createUnpartitionedStreams(conf, uri, Lists.newArrayList(logName)); @@ -502,6 +505,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { @Override public void deleteLog(String logName) throws InvalidStreamNameException, LogNotFoundException, IOException { + checkState(); validateName(logName); Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName)); if (!uri.isPresent()) { @@ -532,6 +536,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { Optional<DynamicDistributedLogConfiguration> dynamicLogConf, Optional<StatsLogger> perStreamStatsLogger) throws InvalidStreamNameException, IOException { + checkState(); validateName(logName); Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName)); if (!uri.isPresent()) { @@ -549,12 +554,14 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { @Override public boolean logExists(String logName) throws IOException, IllegalArgumentException { + checkState(); Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName)); return uri.isPresent() && checkIfLogExists(conf, uri.get(), logName); } @Override public Iterator<String> getLogs() throws IOException { + checkState(); return FutureUtils.result(metadataStore.getLogs()); } @@ -565,6 +572,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { @Override public synchronized AccessControlManager createAccessControlManager() throws IOException { + checkState(); if (null == accessControlManager) { String aclRootPath = bkdlConfig.getACLRootPath(); // Build the access control manager @@ -614,9 +622,9 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { } private static ZooKeeperClientBuilder createDLZKClientBuilder(String zkcName, - DistributedLogConfiguration conf, - String zkServers, - StatsLogger statsLogger) { + DistributedLogConfiguration conf, + String zkServers, + StatsLogger statsLogger) { RetryPolicy retryPolicy = null; if (conf.getZKNumRetries() > 0) { retryPolicy = new BoundExponentialBackoffRetryPolicy( @@ -633,7 +641,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { .statsLogger(statsLogger) .zkAclId(conf.getZkAclId()); LOG.info("Created shared zooKeeper client builder {}: zkServers = {}, numRetries = {}, sessionTimeout = {}, retryBackoff = {}," - + " maxRetryBackoff = {}, zkAclId = {}.", new Object[] { zkcName, zkServers, conf.getZKNumRetries(), + + " maxRetryBackoff = {}, zkAclId = {}.", new Object[] { zkcName, zkServers, conf.getZKNumRetries(), conf.getZKSessionTimeoutMilliseconds(), conf.getZKRetryBackoffStartMillis(), conf.getZKRetryBackoffMaxMillis(), conf.getZkAclId() }); return builder; @@ -678,7 +686,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { .featureProvider(featureProviderOptional) .statsLogger(statsLogger); LOG.info("Created shared client builder {} : zkServers = {}, ledgersPath = {}, numIOThreads = {}", - new Object[] { bkcName, zkServers, ledgersPath, conf.getBKClientNumberIOThreads() }); + new Object[] { bkcName, zkServers, ledgersPath, conf.getBKClientNumberIOThreads() }); return builder; } @@ -711,6 +719,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { * @throws IOException */ private <T> T withZooKeeperClient(ZooKeeperClientHandler<T> handler) throws IOException { + checkState(); return handler.handle(sharedWriterZKCForDL); } @@ -815,6 +824,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { Optional<StatsLogger> perStreamStatsLogger) throws InvalidStreamNameException, IOException { // Make sure the name is well formed + checkState(); validateName(nameOfLogStream); DistributedLogConfiguration mergedConfiguration = new DistributedLogConfiguration(); @@ -918,6 +928,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { if (bkdlConfig.isFederatedNamespace()) { throw new UnsupportedOperationException("Use DistributedLogNamespace methods for federated namespace"); } + checkState(); validateName(nameOfMetadataNode); return new ZKMetadataAccessor(nameOfMetadataNode, conf, namespace, sharedWriterZKCBuilderForDL, sharedReaderZKCBuilderForDL, statsLogger); @@ -1035,6 +1046,13 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { }, conf, uri); } + private void checkState() throws IOException { + if (closed.get()) { + LOG.error("BKDistributedLogNamespace {} is already closed", namespace); + throw new AlreadyClosedException("Namespace " + namespace + " is already closed"); + } + } + /** * Close the distributed log manager factory, freeing any resources it may hold. */ @@ -1043,16 +1061,13 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { ZooKeeperClient writerZKC; ZooKeeperClient readerZKC; AccessControlManager acm; - synchronized (this) { - if (closed) { - return; - } - closed = true; + if (closed.compareAndSet(false, true)) { writerZKC = sharedWriterZKCForBK; readerZKC = sharedReaderZKCForBK; acm = accessControlManager; + } else { + return; } - if (null != acm) { acm.close(); LOG.info("Access Control Manager Stopped."); @@ -1070,11 +1085,11 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { // Shutdown the schedulers SchedulerUtils.shutdownScheduler(scheduler, conf.getSchedulerShutdownTimeoutMs(), - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); LOG.info("Executor Service Stopped."); if (scheduler != readAheadExecutor) { SchedulerUtils.shutdownScheduler(readAheadExecutor, conf.getSchedulerShutdownTimeoutMs(), - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); LOG.info("ReadAhead Executor Service Stopped."); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ab0868cc/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java index b3724bc..e68b916 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.collect.Sets; import com.twitter.distributedlog.callback.NamespaceListener; +import com.twitter.distributedlog.exceptions.AlreadyClosedException; import com.twitter.distributedlog.exceptions.InvalidStreamNameException; import com.twitter.distributedlog.exceptions.LockingException; import com.twitter.distributedlog.exceptions.ZKException; @@ -41,6 +42,7 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -419,4 +421,74 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase { testConf.setLedgerAllocatorPoolName(null); validateBadAllocatorConfiguration(testConf, uri); } + + @Test(timeout = 60000) + public void testUseNamespaceAfterCloseShouldFailFast() throws Exception { + URI uri = createDLMURI("/" + runtime.getMethodName()); + BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder() + .conf(conf) + .uri(uri) + .build(); + // before closing the namespace, no exception should be thrown + String logName = "test-stream"; + // create a log + namespace.createLog(logName); + // log exists + Assert.assertTrue(namespace.logExists(logName)); + // create a dlm + DistributedLogManager dlm = namespace.openLog(logName); + // do some writes + BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned()); + for (long i = 0; i < 3; i++) { + LogRecord record = DLMTestUtil.getLargeLogRecordInstance(i); + writer.write(record); + } + writer.closeAndComplete(); + // do some reads + LogReader reader = dlm.getInputStream(0); + for (long i = 0; i < 3; i++) { + Assert.assertEquals(reader.readNext(false).getTransactionId(), i); + } + namespace.deleteLog(logName); + Assert.assertFalse(namespace.logExists(logName)); + + // now try to close the namespace + namespace.close(); + try { + namespace.createLog(logName); + fail("Should throw exception after namespace is closed"); + } catch (AlreadyClosedException e) { + // No-ops + } + try { + namespace.openLog(logName); + fail("Should throw exception after namespace is closed"); + } catch (AlreadyClosedException e) { + // No-ops + } + try { + namespace.logExists(logName); + fail("Should throw exception after namespace is closed"); + } catch (AlreadyClosedException e) { + // No-ops + } + try { + namespace.getLogs(); + fail("Should throw exception after namespace is closed"); + } catch (AlreadyClosedException e) { + // No-ops + } + try { + namespace.deleteLog(logName); + fail("Should throw exception after namespace is closed"); + } catch (AlreadyClosedException e) { + // No-ops + } + try { + namespace.createAccessControlManager(); + fail("Should throw exception after namespace is closed"); + } catch (AlreadyClosedException e) { + // No-ops + } + } }