http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java index d1069c3..60bc420 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java @@ -28,6 +28,8 @@ import com.twitter.distributedlog.TestZooKeeperClientBuilder; import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater; import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater; +import com.twitter.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.SchedulerUtils; import org.apache.zookeeper.CreateMode; @@ -103,8 +105,10 @@ public class TestDLCK extends TestDistributedLogBase { confLocal.setLogSegmentCacheEnabled(false); URI uri = createDLMURI("/check-and-repair-dl-namespace"); zkc.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - com.twitter.distributedlog.DistributedLogManagerFactory factory = - new com.twitter.distributedlog.DistributedLogManagerFactory(confLocal, uri); + DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + .conf(confLocal) + .uri(uri) + .build(); OrderedScheduler scheduler = OrderedScheduler.newBuilder() .name("dlck-tool") .corePoolSize(1) @@ -114,17 +118,20 @@ public class TestDLCK extends TestDistributedLogBase { String streamName = "check-and-repair-dl-namespace"; // Create completed log segments - DistributedLogManager dlm = factory.createDistributedLogManagerWithSharedClients(streamName); + DistributedLogManager dlm = namespace.openLog(streamName); DLMTestUtil.injectLogSegmentWithLastDLSN(dlm, confLocal, 1L, 1L, 10, false); DLMTestUtil.injectLogSegmentWithLastDLSN(dlm, confLocal, 2L, 11L, 10, true); DLMTestUtil.injectLogSegmentWithLastDLSN(dlm, confLocal, 3L, 21L, 10, false); DLMTestUtil.injectLogSegmentWithLastDLSN(dlm, confLocal, 4L, 31L, 10, true); // dryrun - BookKeeperClient bkc = getBookKeeperClient(factory); - DistributedLogAdmin.checkAndRepairDLNamespace(uri, factory, - new DryrunLogSegmentMetadataStoreUpdater(confLocal, getLogSegmentMetadataStore(factory)), - scheduler, bkc, confLocal.getBKDigestPW(), false, false); + DistributedLogAdmin.checkAndRepairDLNamespace( + uri, + namespace, + new DryrunLogSegmentMetadataStoreUpdater(confLocal, getLogSegmentMetadataStore(namespace)), + scheduler, + false, + false); Map<Long, LogSegmentMetadata> segments = getLogSegments(dlm); LOG.info("segments after drynrun {}", segments); @@ -134,10 +141,13 @@ public class TestDLCK extends TestDistributedLogBase { verifyLogSegment(segments, new DLSN(4L, 16L, 0L), 4L, 9, 39L); // check and repair - bkc = getBookKeeperClient(factory); - DistributedLogAdmin.checkAndRepairDLNamespace(uri, factory, - LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, getLogSegmentMetadataStore(factory)), - scheduler, bkc, confLocal.getBKDigestPW(), false, false); + DistributedLogAdmin.checkAndRepairDLNamespace( + uri, + namespace, + LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, getLogSegmentMetadataStore(namespace)), + scheduler, + false, + false); segments = getLogSegments(dlm); LOG.info("segments after repair {}", segments); @@ -148,7 +158,7 @@ public class TestDLCK extends TestDistributedLogBase { dlm.close(); SchedulerUtils.shutdownScheduler(executorService, 5, TimeUnit.MINUTES); - factory.close(); + namespace.close(); } }
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java index 66d7228..1e39e49 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java @@ -24,6 +24,8 @@ import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.TestZooKeeperClientBuilder; import com.twitter.distributedlog.annotations.DistributedLogAnnotations; import com.twitter.distributedlog.exceptions.UnexpectedException; +import com.twitter.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; import com.twitter.distributedlog.util.Utils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; @@ -90,21 +92,25 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase { URI uri = createDLMURI("/change-sequence-number"); zooKeeperClient.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - com.twitter.distributedlog.DistributedLogManagerFactory factory = - new com.twitter.distributedlog.DistributedLogManagerFactory(confLocal, uri); - com.twitter.distributedlog.DistributedLogManagerFactory readFactory = - new com.twitter.distributedlog.DistributedLogManagerFactory(readConf, uri); + DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + .conf(confLocal) + .uri(uri) + .build(); + DistributedLogNamespace readNamespace = DistributedLogNamespaceBuilder.newBuilder() + .conf(readConf) + .uri(uri) + .build(); String streamName = "change-sequence-number"; // create completed log segments - DistributedLogManager dlm = factory.createDistributedLogManagerWithSharedClients(streamName); + DistributedLogManager dlm = namespace.openLog(streamName); DLMTestUtil.generateCompletedLogSegments(dlm, confLocal, 4, 10); DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(dlm, confLocal, 5, 41, false, 10, true); dlm.close(); // create a reader - DistributedLogManager readDLM = readFactory.createDistributedLogManagerWithSharedClients(streamName); + DistributedLogManager readDLM = readNamespace.openLog(streamName); AsyncLogReader reader = readDLM.getAsyncLogReader(DLSN.InitialDLSN); // read the records @@ -121,7 +127,7 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase { LOG.info("Injecting bad log segment '3'"); - dlm = factory.createDistributedLogManagerWithSharedClients(streamName); + dlm = namespace.openLog(streamName); DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(dlm, confLocal, 3L, 5 * 10 + 1, true, 10, false); LOG.info("Injected bad log segment '3'"); @@ -140,8 +146,8 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase { LOG.info("Dryrun fix inprogress segment that has lower sequence number"); // Dryrun - DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(factory, - new DryrunLogSegmentMetadataStoreUpdater(confLocal, getLogSegmentMetadataStore(factory)), streamName, false, false); + DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(namespace, + new DryrunLogSegmentMetadataStoreUpdater(confLocal, getLogSegmentMetadataStore(namespace)), streamName, false, false); try { reader = readDLM.getAsyncLogReader(lastDLSN); @@ -154,8 +160,8 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase { LOG.info("Actual run fix inprogress segment that has lower sequence number"); // Actual run - DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(factory, - LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, getLogSegmentMetadataStore(factory)), streamName, false, false); + DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(namespace, + LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, getLogSegmentMetadataStore(namespace)), streamName, false, false); // be able to read more after fix reader = readDLM.getAsyncLogReader(lastDLSN); @@ -182,7 +188,7 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase { readDLM.close(); dlm.close(); - factory.close(); - readFactory.close(); + namespace.close(); + readNamespace.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java index 46e8af0..de7016a 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java @@ -531,7 +531,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { children, firstSegmentList); ZooKeeperClientUtils.expireSession(zkc, - DLUtils.getZKServersFromDLUri(uri), conf.getZKSessionTimeoutMilliseconds()); + BKNamespaceDriver.getZKServersFromDLUri(uri), conf.getZKSessionTimeoutMilliseconds()); logger.info("Create another {} segments.", numSegments); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKNamespaceWatcher.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKNamespaceWatcher.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKNamespaceWatcher.java index da9f577..c9a2e5b 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKNamespaceWatcher.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKNamespaceWatcher.java @@ -174,7 +174,7 @@ public class TestZKNamespaceWatcher extends TestDistributedLogBase { createLogInNamespace(uri, "test2"); latches[2].await(); assertEquals(2, receivedLogs.get().size()); - ZooKeeperClientUtils.expireSession(zkc, DLUtils.getZKServersFromDLUri(uri), zkSessionTimeoutMs); + ZooKeeperClientUtils.expireSession(zkc, BKNamespaceDriver.getZKServersFromDLUri(uri), zkSessionTimeoutMs); latches[3].await(); assertEquals(2, receivedLogs.get().size()); createLogInNamespace(uri, "test3"); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java index 673d856..0ce9f46 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java @@ -30,6 +30,7 @@ import com.twitter.distributedlog.ZooKeeperClientUtils; import com.twitter.distributedlog.callback.NamespaceListener; import com.twitter.distributedlog.exceptions.LogExistsException; import com.twitter.distributedlog.exceptions.UnexpectedException; +import com.twitter.distributedlog.impl.BKNamespaceDriver; import com.twitter.distributedlog.metadata.LogMetadataStore; import com.twitter.distributedlog.util.DLUtils; import com.twitter.distributedlog.util.FutureUtils; @@ -422,7 +423,7 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase { TestNamespaceListenerWithExpectedSize listener = new TestNamespaceListenerWithExpectedSize(2 * maxLogsPerSubnamespace + 1); metadataStore.registerNamespaceListener(listener); - ZooKeeperClientUtils.expireSession(zkc, DLUtils.getZKServersFromDLUri(uri), zkSessionTimeoutMs); + ZooKeeperClientUtils.expireSession(zkc, BKNamespaceDriver.getZKServersFromDLUri(uri), zkSessionTimeoutMs); String testLogName = "test-log-name"; allLogs.add(testLogName); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java index 4cf86fa..183a405 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java @@ -30,10 +30,13 @@ import com.twitter.distributedlog.LogRecord; import com.twitter.distributedlog.LogRecordWithDLSN; import com.twitter.distributedlog.LogSegmentMetadata; import com.twitter.distributedlog.TestDistributedLogBase; +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.ZooKeeperClientBuilder; import com.twitter.distributedlog.exceptions.EndOfLogSegmentException; import com.twitter.distributedlog.exceptions.ReadCancelledException; import com.twitter.distributedlog.injector.AsyncFailureInjector; import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; +import com.twitter.distributedlog.util.ConfUtils; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.Utils; @@ -59,10 +62,17 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { public TestName runtime = new TestName(); private OrderedScheduler scheduler; private BookKeeperClient bkc; + private ZooKeeperClient zkc; @Before public void setup() throws Exception { super.setup(); + zkc = ZooKeeperClientBuilder.newBuilder() + .name("test-zk") + .zkServers(bkutil.getZkServers()) + .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) + .zkAclId(conf.getZkAclId()) + .build(); bkc = BookKeeperClientBuilder.newBuilder() .name("test-bk") .dlConfig(conf) @@ -83,6 +93,9 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { if (null != scheduler) { scheduler.shutdown(); } + if (null != zkc) { + zkc.close(); + } super.teardown(); } @@ -91,7 +104,14 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { DistributedLogConfiguration conf) throws Exception { LogSegmentEntryStore store = new BKLogSegmentEntryStore( - conf, bkc, scheduler, NullStatsLogger.INSTANCE, AsyncFailureInjector.NULL); + conf, + ConfUtils.getConstDynConf(conf), + zkc, + bkc, + scheduler, + null, + NullStatsLogger.INSTANCE, + AsyncFailureInjector.NULL); return (BKLogSegmentEntryReader) FutureUtils.result(store.openReader(segment, startEntryId)); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java index 41544d6..1b19b2e 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java @@ -17,16 +17,16 @@ */ package com.twitter.distributedlog.impl.metadata; -import com.twitter.distributedlog.TestZooKeeperClientBuilder; -import com.twitter.distributedlog.metadata.BKDLConfig; -import com.twitter.distributedlog.metadata.DLMetadata; import com.google.common.collect.Lists; import com.twitter.distributedlog.DLMTestUtil; import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.MetadataAccessor; +import com.twitter.distributedlog.TestZooKeeperClientBuilder; +import com.twitter.distributedlog.impl.metadata.BKDLConfig; +import com.twitter.distributedlog.metadata.DLMetadata; import com.twitter.distributedlog.metadata.LogMetadataForWriter; import com.twitter.distributedlog.namespace.DistributedLogNamespace; import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; -import com.twitter.distributedlog.DistributedLogManager; import com.twitter.distributedlog.DistributedLogConstants; import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.ZooKeeperClient; @@ -317,9 +317,9 @@ public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase { .uri(uri) .build(); - DistributedLogManager dlm = namespace.openLog(logName); - dlm.createOrUpdateMetadata(logName.getBytes("UTF-8")); - dlm.close(); + MetadataAccessor accessor = namespace.getNamespaceDriver().getMetadataAccessor(logName); + accessor.createOrUpdateMetadata(logName.getBytes("UTF-8")); + accessor.close(); testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZkMetadataResolver.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZkMetadataResolver.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZkMetadataResolver.java new file mode 100644 index 0000000..bbabbb2 --- /dev/null +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZkMetadataResolver.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.metadata; + +import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.DistributedLogConstants; +import com.twitter.distributedlog.TestZooKeeperClientBuilder; +import com.twitter.distributedlog.impl.metadata.BKDLConfig; +import com.twitter.distributedlog.impl.metadata.ZkMetadataResolver; +import com.twitter.distributedlog.metadata.DLMetadata; +import com.twitter.distributedlog.util.Utils; +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.ZooKeeperClientBuilder; +import com.twitter.distributedlog.ZooKeeperClusterTestCase; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestZkMetadataResolver extends ZooKeeperClusterTestCase { + + private static final BKDLConfig bkdlConfig = new BKDLConfig("127.0.0.1:7000", "ledgers"); + private static final BKDLConfig bkdlConfig2 = new BKDLConfig("127.0.0.1:7000", "ledgers2"); + + private ZooKeeperClient zkc; + private ZkMetadataResolver resolver; + + @Before + public void setup() throws Exception { + zkc = TestZooKeeperClientBuilder.newBuilder() + .uri(createURI("/")) + .sessionTimeoutMs(10000) + .build(); + resolver = new ZkMetadataResolver(zkc); + } + + @After + public void tearDown() throws Exception { + zkc.close(); + } + + private URI createURI(String path) { + return URI.create("distributedlog://127.0.0.1:" + zkPort + path); + } + + @Test(timeout = 60000) + public void testResolveFailures() throws Exception { + // resolve unexisted path + try { + resolver.resolve(createURI("/unexisted/path")); + fail("Should fail if no metadata resolved."); + } catch (IOException e) { + // expected + } + // resolve existed unbound path + Utils.zkCreateFullPathOptimistic(zkc, "/existed/path", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + try { + resolver.resolve(createURI("/existed/path")); + fail("Should fail if no metadata resolved."); + } catch (IOException e) { + // expected + } + } + + @Test(timeout = 60000) + public void testResolve() throws Exception { + DLMetadata dlMetadata = DLMetadata.create(bkdlConfig); + dlMetadata.create(createURI("/messaging/distributedlog-testresolve")); + DLMetadata dlMetadata2 = DLMetadata.create(bkdlConfig2); + dlMetadata2.create(createURI("/messaging/distributedlog-testresolve/child")); + assertEquals(dlMetadata, + resolver.resolve(createURI("/messaging/distributedlog-testresolve"))); + assertEquals(dlMetadata2, + resolver.resolve(createURI("/messaging/distributedlog-testresolve/child"))); + assertEquals(dlMetadata2, + resolver.resolve(createURI("/messaging/distributedlog-testresolve/child/unknown"))); + Utils.zkCreateFullPathOptimistic(zkc, "/messaging/distributedlog-testresolve/child/child2", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + assertEquals(dlMetadata2, + resolver.resolve(createURI("/messaging/distributedlog-testresolve/child/child2"))); + } + + @Test(timeout = 60000) + public void testEncodeRegionID() throws Exception { + DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); + + URI uri = createURI("/messaging/distributedlog-testencoderegionid/dl1"); + DLMetadata meta1 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")); + meta1.create(uri); + BKDLConfig read1 = BKDLConfig.resolveDLConfig(zkc, uri); + BKDLConfig.propagateConfiguration(read1, dlConf); + assertFalse(dlConf.getEncodeRegionIDInLogSegmentMetadata()); + + BKDLConfig.clearCachedDLConfigs(); + + DLMetadata meta2 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers").setEncodeRegionID(true)); + meta2.update(uri); + BKDLConfig read2 = BKDLConfig.resolveDLConfig(zkc, uri); + BKDLConfig.propagateConfiguration(read2, dlConf); + assertTrue(dlConf.getEncodeRegionIDInLogSegmentMetadata()); + + BKDLConfig.clearCachedDLConfigs(); + + DLMetadata meta3 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers").setEncodeRegionID(false)); + meta3.update(uri); + BKDLConfig read3 = BKDLConfig.resolveDLConfig(zkc, uri); + BKDLConfig.propagateConfiguration(read3, dlConf); + assertFalse(dlConf.getEncodeRegionIDInLogSegmentMetadata()); + + BKDLConfig.clearCachedDLConfigs(); + } + + @Test(timeout = 60000) + public void testFirstLogSegmentSequenceNumber() throws Exception { + DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); + + URI uri = createURI("/messaging/distributedlog-testfirstledgerseqno/dl1"); + DLMetadata meta1 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")); + meta1.create(uri); + BKDLConfig read1 = BKDLConfig.resolveDLConfig(zkc, uri); + BKDLConfig.propagateConfiguration(read1, dlConf); + assertEquals(DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO, dlConf.getFirstLogSegmentSequenceNumber()); + + BKDLConfig.clearCachedDLConfigs(); + + DLMetadata meta2 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers") + .setFirstLogSegmentSeqNo(9999L)); + meta2.update(uri); + BKDLConfig read2 = BKDLConfig.resolveDLConfig(zkc, uri); + BKDLConfig.propagateConfiguration(read2, dlConf); + assertEquals(9999L, dlConf.getFirstLogSegmentSequenceNumber()); + + BKDLConfig.clearCachedDLConfigs(); + + DLMetadata meta3 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers") + .setFirstLogSegmentSeqNo(99L)); + meta3.update(uri); + BKDLConfig read3 = BKDLConfig.resolveDLConfig(zkc, uri); + BKDLConfig.propagateConfiguration(read3, dlConf); + assertEquals(99L, dlConf.getFirstLogSegmentSequenceNumber()); + + BKDLConfig.clearCachedDLConfigs(); + } + + @Test(timeout = 60000) + public void testFederatedNamespace() throws Exception { + DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); + + URI uri = createURI("/messaging/distributedlog-testfederatednamespace/dl1"); + DLMetadata meta1 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")); + meta1.create(uri); + BKDLConfig read1 = BKDLConfig.resolveDLConfig(zkc, uri); + BKDLConfig.propagateConfiguration(read1, dlConf); + assertTrue(dlConf.getCreateStreamIfNotExists()); + + BKDLConfig.clearCachedDLConfigs(); + + DLMetadata meta2 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers") + .setFederatedNamespace(true)); + meta2.update(uri); + BKDLConfig read2 = BKDLConfig.resolveDLConfig(zkc, uri); + BKDLConfig.propagateConfiguration(read2, dlConf); + assertFalse(dlConf.getCreateStreamIfNotExists()); + + BKDLConfig.clearCachedDLConfigs(); + + DLMetadata meta3 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers") + .setFederatedNamespace(false)); + meta3.update(uri); + BKDLConfig read3 = BKDLConfig.resolveDLConfig(zkc, uri); + BKDLConfig.propagateConfiguration(read3, dlConf); + // if it is non-federated namespace, it won't change the create stream behavior. + assertFalse(dlConf.getCreateStreamIfNotExists()); + + BKDLConfig.clearCachedDLConfigs(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestDLMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestDLMetadata.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestDLMetadata.java index d4c2f31..e3cc239 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestDLMetadata.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestDLMetadata.java @@ -19,6 +19,7 @@ package com.twitter.distributedlog.metadata; import com.twitter.distributedlog.LocalDLMEmulator; import com.twitter.distributedlog.ZooKeeperClusterTestCase; +import com.twitter.distributedlog.impl.metadata.BKDLConfig; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.junit.After; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestZkMetadataResolver.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestZkMetadataResolver.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestZkMetadataResolver.java deleted file mode 100644 index 79fb539..0000000 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestZkMetadataResolver.java +++ /dev/null @@ -1,200 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.metadata; - -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.TestZooKeeperClientBuilder; -import com.twitter.distributedlog.util.Utils; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.ZooKeeperClientBuilder; -import com.twitter.distributedlog.ZooKeeperClusterTestCase; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.ZooDefs; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.net.URI; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class TestZkMetadataResolver extends ZooKeeperClusterTestCase { - - private static final BKDLConfig bkdlConfig = new BKDLConfig("127.0.0.1:7000", "ledgers"); - private static final BKDLConfig bkdlConfig2 = new BKDLConfig("127.0.0.1:7000", "ledgers2"); - - private ZooKeeperClient zkc; - private ZkMetadataResolver resolver; - - @Before - public void setup() throws Exception { - zkc = TestZooKeeperClientBuilder.newBuilder() - .uri(createURI("/")) - .sessionTimeoutMs(10000) - .build(); - resolver = new ZkMetadataResolver(zkc); - } - - @After - public void tearDown() throws Exception { - zkc.close(); - } - - private URI createURI(String path) { - return URI.create("distributedlog://127.0.0.1:" + zkPort + path); - } - - @Test(timeout = 60000) - public void testResolveFailures() throws Exception { - // resolve unexisted path - try { - resolver.resolve(createURI("/unexisted/path")); - fail("Should fail if no metadata resolved."); - } catch (IOException e) { - // expected - } - // resolve existed unbound path - Utils.zkCreateFullPathOptimistic(zkc, "/existed/path", new byte[0], - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - try { - resolver.resolve(createURI("/existed/path")); - fail("Should fail if no metadata resolved."); - } catch (IOException e) { - // expected - } - } - - @Test(timeout = 60000) - public void testResolve() throws Exception { - DLMetadata dlMetadata = DLMetadata.create(bkdlConfig); - dlMetadata.create(createURI("/messaging/distributedlog-testresolve")); - DLMetadata dlMetadata2 = DLMetadata.create(bkdlConfig2); - dlMetadata2.create(createURI("/messaging/distributedlog-testresolve/child")); - assertEquals(dlMetadata, - resolver.resolve(createURI("/messaging/distributedlog-testresolve"))); - assertEquals(dlMetadata2, - resolver.resolve(createURI("/messaging/distributedlog-testresolve/child"))); - assertEquals(dlMetadata2, - resolver.resolve(createURI("/messaging/distributedlog-testresolve/child/unknown"))); - Utils.zkCreateFullPathOptimistic(zkc, "/messaging/distributedlog-testresolve/child/child2", new byte[0], - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEquals(dlMetadata2, - resolver.resolve(createURI("/messaging/distributedlog-testresolve/child/child2"))); - } - - @Test(timeout = 60000) - public void testEncodeRegionID() throws Exception { - DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); - - URI uri = createURI("/messaging/distributedlog-testencoderegionid/dl1"); - DLMetadata meta1 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")); - meta1.create(uri); - BKDLConfig read1 = BKDLConfig.resolveDLConfig(zkc, uri); - BKDLConfig.propagateConfiguration(read1, dlConf); - assertFalse(dlConf.getEncodeRegionIDInLogSegmentMetadata()); - - BKDLConfig.clearCachedDLConfigs(); - - DLMetadata meta2 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers").setEncodeRegionID(true)); - meta2.update(uri); - BKDLConfig read2 = BKDLConfig.resolveDLConfig(zkc, uri); - BKDLConfig.propagateConfiguration(read2, dlConf); - assertTrue(dlConf.getEncodeRegionIDInLogSegmentMetadata()); - - BKDLConfig.clearCachedDLConfigs(); - - DLMetadata meta3 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers").setEncodeRegionID(false)); - meta3.update(uri); - BKDLConfig read3 = BKDLConfig.resolveDLConfig(zkc, uri); - BKDLConfig.propagateConfiguration(read3, dlConf); - assertFalse(dlConf.getEncodeRegionIDInLogSegmentMetadata()); - - BKDLConfig.clearCachedDLConfigs(); - } - - @Test(timeout = 60000) - public void testFirstLogSegmentSequenceNumber() throws Exception { - DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); - - URI uri = createURI("/messaging/distributedlog-testfirstledgerseqno/dl1"); - DLMetadata meta1 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")); - meta1.create(uri); - BKDLConfig read1 = BKDLConfig.resolveDLConfig(zkc, uri); - BKDLConfig.propagateConfiguration(read1, dlConf); - assertEquals(DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO, dlConf.getFirstLogSegmentSequenceNumber()); - - BKDLConfig.clearCachedDLConfigs(); - - DLMetadata meta2 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers") - .setFirstLogSegmentSeqNo(9999L)); - meta2.update(uri); - BKDLConfig read2 = BKDLConfig.resolveDLConfig(zkc, uri); - BKDLConfig.propagateConfiguration(read2, dlConf); - assertEquals(9999L, dlConf.getFirstLogSegmentSequenceNumber()); - - BKDLConfig.clearCachedDLConfigs(); - - DLMetadata meta3 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers") - .setFirstLogSegmentSeqNo(99L)); - meta3.update(uri); - BKDLConfig read3 = BKDLConfig.resolveDLConfig(zkc, uri); - BKDLConfig.propagateConfiguration(read3, dlConf); - assertEquals(99L, dlConf.getFirstLogSegmentSequenceNumber()); - - BKDLConfig.clearCachedDLConfigs(); - } - - @Test(timeout = 60000) - public void testFederatedNamespace() throws Exception { - DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); - - URI uri = createURI("/messaging/distributedlog-testfederatednamespace/dl1"); - DLMetadata meta1 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")); - meta1.create(uri); - BKDLConfig read1 = BKDLConfig.resolveDLConfig(zkc, uri); - BKDLConfig.propagateConfiguration(read1, dlConf); - assertTrue(dlConf.getCreateStreamIfNotExists()); - - BKDLConfig.clearCachedDLConfigs(); - - DLMetadata meta2 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers") - .setFederatedNamespace(true)); - meta2.update(uri); - BKDLConfig read2 = BKDLConfig.resolveDLConfig(zkc, uri); - BKDLConfig.propagateConfiguration(read2, dlConf); - assertFalse(dlConf.getCreateStreamIfNotExists()); - - BKDLConfig.clearCachedDLConfigs(); - - DLMetadata meta3 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers") - .setFederatedNamespace(false)); - meta3.update(uri); - BKDLConfig read3 = BKDLConfig.resolveDLConfig(zkc, uri); - BKDLConfig.propagateConfiguration(read3, dlConf); - // if it is non-federated namespace, it won't change the create stream behavior. - assertFalse(dlConf.getCreateStreamIfNotExists()); - - BKDLConfig.clearCachedDLConfigs(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java index 3225ced..a2a0ca6 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java @@ -20,7 +20,7 @@ package com.twitter.distributedlog.service; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.LocalDLMEmulator; import com.twitter.distributedlog.client.routing.SingleHostRoutingService; -import com.twitter.distributedlog.metadata.BKDLConfig; +import com.twitter.distributedlog.impl.metadata.BKDLConfig; import com.twitter.distributedlog.metadata.DLMetadata; import com.twitter.distributedlog.service.placement.EqualLoadAppraiser; import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java index cfb5b8d..2e49d92 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java @@ -23,12 +23,12 @@ import com.google.common.util.concurrent.RateLimiter; import com.twitter.common.zookeeper.ServerSet; import com.twitter.distributedlog.client.monitor.MonitorServiceClient; import com.twitter.distributedlog.client.serverset.DLZkServerSet; +import com.twitter.distributedlog.impl.BKNamespaceDriver; import com.twitter.distributedlog.service.ClientUtils; import com.twitter.distributedlog.service.DLSocketAddress; import com.twitter.distributedlog.service.DistributedLogClient; import com.twitter.distributedlog.service.DistributedLogClientBuilder; import com.twitter.distributedlog.tools.Tool; -import com.twitter.distributedlog.util.DLUtils; import com.twitter.finagle.builder.ClientBuilder; import com.twitter.finagle.thrift.ClientId$; import com.twitter.util.Await; @@ -269,8 +269,8 @@ public class BalancerTool extends Tool { ClientUtils.buildClient(builder2); try { SimpleBalancer balancer = new SimpleBalancer( - DLUtils.getZKServersFromDLUri(region1), pair1.getLeft(), pair1.getRight(), - DLUtils.getZKServersFromDLUri(region2), pair2.getLeft(), pair2.getRight()); + BKNamespaceDriver.getZKServersFromDLUri(region1), pair1.getLeft(), pair1.getRight(), + BKNamespaceDriver.getZKServersFromDLUri(region2), pair2.getLeft(), pair2.getRight()); try { return runBalancer(balancer); } finally { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java index 18b9d1f..4f01bdc 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java @@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory; import com.twitter.distributedlog.BKDistributedLogNamespace; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.util.DLUtils; +import com.twitter.distributedlog.impl.BKNamespaceDriver; import com.twitter.distributedlog.util.Utils; /** @@ -55,11 +55,12 @@ public class ZKPlacementStateManager implements PlacementStateManager { private boolean watching = false; public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) { - zkClient = BKDistributedLogNamespace.createDLZKClientBuilder( - String.format("dlzk:%s:factory_writer_shared", uri), + String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri); + zkClient = BKNamespaceDriver.createZKClientBuilder( + String.format("ZKPlacementStateManager-%s", zkServers), conf, - DLUtils.getZKServersFromDLUri(uri), - statsLogger.scope("dlzk_factory_writer_shared")).build(); + zkServers, + statsLogger.scope("placement_state_manager")).build(); serverLoadPath = uri.getPath() + SERVER_LOAD_DIR; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java index c45e42c..218ea06 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java @@ -30,10 +30,10 @@ import com.twitter.distributedlog.LogRecord; import com.twitter.distributedlog.LogRecordWithDLSN; import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.acl.AccessControlManager; -import com.twitter.distributedlog.acl.ZKAccessControl; +import com.twitter.distributedlog.impl.acl.ZKAccessControl; import com.twitter.distributedlog.client.routing.LocalRoutingService; import com.twitter.distributedlog.exceptions.DLException; -import com.twitter.distributedlog.metadata.BKDLConfig; +import com.twitter.distributedlog.impl.metadata.BKDLConfig; import com.twitter.distributedlog.namespace.DistributedLogNamespace; import com.twitter.distributedlog.service.stream.StreamManagerImpl; import com.twitter.distributedlog.thrift.AccessControlEntry; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java index 20c81f3..87ddec0 100644 --- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java +++ b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java @@ -18,12 +18,14 @@ package com.twitter.distributedlog.mapreduce; import com.google.common.collect.Lists; -import com.twitter.distributedlog.BKDistributedLogNamespace; -import com.twitter.distributedlog.DLSN; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.DistributedLogManager; +import com.twitter.distributedlog.DLSN; import com.twitter.distributedlog.LogRecordWithDLSN; import com.twitter.distributedlog.LogSegmentMetadata; +import com.twitter.distributedlog.impl.BKNamespaceDriver; +import com.twitter.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperAccessor; @@ -57,7 +59,7 @@ public class DistributedLogInputFormat protected Configuration conf; protected DistributedLogConfiguration dlConf; protected URI dlUri; - protected BKDistributedLogNamespace namespace; + protected DistributedLogNamespace namespace; protected String streamName; protected DistributedLogManager dlm; @@ -69,7 +71,7 @@ public class DistributedLogInputFormat dlUri = URI.create(configuration.get(DL_URI, "")); streamName = configuration.get(DL_STREAM, ""); try { - namespace = BKDistributedLogNamespace.newBuilder() + namespace = DistributedLogNamespaceBuilder.newBuilder() .conf(dlConf) .uri(dlUri) .build(); @@ -89,7 +91,7 @@ public class DistributedLogInputFormat throws IOException, InterruptedException { List<LogSegmentMetadata> segments = dlm.getLogSegments(); List<InputSplit> inputSplits = Lists.newArrayListWithCapacity(segments.size()); - BookKeeper bk = namespace.getReaderBKC().get(); + BookKeeper bk = ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC().get(); LedgerManager lm = BookKeeperAccessor.getLedgerManager(bk); final AtomicInteger rcHolder = new AtomicInteger(0); final AtomicReference<LedgerMetadata> metadataHolder = new AtomicReference<LedgerMetadata>(null); @@ -121,7 +123,7 @@ public class DistributedLogInputFormat return new LogSegmentReader( streamName, dlConf, - namespace.getReaderBKC().get(), + ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC().get(), (LogSegmentSplit) inputSplit); } }