DL-118: Stream metadata store followup - rename ZKLogMetadata* to LogMetadata*

As the followup change, this change is to remove 'ZK' from 'ZKLogMetadata*' 
class


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/dc4548be
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/dc4548be
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/dc4548be

Branch: refs/heads/master
Commit: dc4548bebcd8c8ce680fb14cfaf203dd6605fa29
Parents: b91d49a
Author: Sijie Guo <sij...@twitter.com>
Authored: Wed Nov 30 18:17:46 2016 -0800
Committer: Sijie Guo <sij...@twitter.com>
Committed: Thu Dec 29 02:09:00 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKDistributedLogManager.java |  18 +-
 .../twitter/distributedlog/BKLogHandler.java    |   6 +-
 .../distributedlog/BKLogReadHandler.java        |   6 +-
 .../distributedlog/BKLogWriteHandler.java       |   6 +-
 .../impl/ZKLogSegmentMetadataStore.java         |   8 +-
 .../impl/metadata/ZKLogMetadata.java            | 175 -------------------
 .../impl/metadata/ZKLogMetadataForReader.java   | 103 -----------
 .../impl/metadata/ZKLogMetadataForWriter.java   |  64 -------
 .../impl/metadata/ZKLogStreamMetadataStore.java |  53 +++---
 .../logsegment/LogSegmentMetadataStore.java     |   8 +-
 .../distributedlog/metadata/LogMetadata.java    | 175 +++++++++++++++++++
 .../metadata/LogMetadataForReader.java          | 103 +++++++++++
 .../metadata/LogMetadataForWriter.java          |  64 +++++++
 .../metadata/LogStreamMetadataStore.java        |  14 +-
 .../readahead/ReadAheadWorker.java              |   7 +-
 .../TestBKDistributedLogManager.java            |  13 +-
 .../distributedlog/TestLogSegmentsZK.java       |   6 +-
 .../impl/TestZKLogSegmentMetadataStore.java     |  16 +-
 .../impl/metadata/TestZKLogMetadata.java        |  59 -------
 .../metadata/TestZKLogStreamMetadataStore.java  |   5 +-
 .../TestZKLogStreamMetadataStoreUtils.java      |  18 +-
 .../metadata/TestLogMetadata.java               |  59 +++++++
 22 files changed, 494 insertions(+), 492 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index 0a34caa..cd3f359 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -34,8 +34,8 @@ import 
com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.function.CloseAsyncCloseableFunction;
 import com.twitter.distributedlog.function.GetVersionedValueFunction;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
+import com.twitter.distributedlog.metadata.LogMetadataForReader;
+import com.twitter.distributedlog.metadata.LogMetadataForWriter;
 import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
 import com.twitter.distributedlog.io.AsyncCloseable;
 import com.twitter.distributedlog.lock.DistributedLock;
@@ -501,7 +501,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                                                     AsyncNotification 
notification,
                                                     boolean 
deserializeRecordSet,
                                                     boolean 
isHandleForReading) {
-        ZKLogMetadataForReader logMetadata = ZKLogMetadataForReader.of(uri, 
name, streamIdentifier);
+        LogMetadataForReader logMetadata = LogMetadataForReader.of(uri, name, 
streamIdentifier);
         return new BKLogReadHandler(
                 logMetadata,
                 subscriberId,
@@ -524,7 +524,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
 
     // Create Ledger Allocator
 
-    LedgerAllocator createLedgerAllocator(ZKLogMetadataForWriter logMetadata) 
throws IOException {
+    LedgerAllocator createLedgerAllocator(LogMetadataForWriter logMetadata) 
throws IOException {
         LedgerAllocator ledgerAllocatorDelegator;
         if (!dynConf.getEnableLedgerAllocatorPool()) {
             QuorumConfigProvider quorumConfigProvider =
@@ -558,9 +558,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                 name,
                 ownAllocator,
                 conf.getCreateStreamIfNotExists() || ownAllocator
-        ).flatMap(new AbstractFunction1<ZKLogMetadataForWriter, 
Future<BKLogWriteHandler>>() {
+        ).flatMap(new AbstractFunction1<LogMetadataForWriter, 
Future<BKLogWriteHandler>>() {
             @Override
-            public Future<BKLogWriteHandler> apply(ZKLogMetadataForWriter 
logMetadata) {
+            public Future<BKLogWriteHandler> apply(LogMetadataForWriter 
logMetadata) {
                 Promise<BKLogWriteHandler> createPromise = new 
Promise<BKLogWriteHandler>();
                 createWriteHandler(logMetadata, lockHandler, createPromise);
                 return createPromise;
@@ -568,7 +568,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
         });
     }
 
-    private void createWriteHandler(ZKLogMetadataForWriter logMetadata,
+    private void createWriteHandler(LogMetadataForWriter logMetadata,
                                     boolean lockHandler,
                                     final Promise<BKLogWriteHandler> 
createPromise) {
         // Build the locks
@@ -1374,7 +1374,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
      */
     private SubscriptionStateStore getSubscriptionStateStoreInternal(String 
streamIdentifier, String subscriberId) {
         return new ZKSubscriptionStateStore(writerZKC,
-                ZKLogMetadataForReader.getSubscriberPath(uri, name, 
streamIdentifier, subscriberId));
+                LogMetadataForReader.getSubscriberPath(uri, name, 
streamIdentifier, subscriberId));
     }
 
     @Override
@@ -1390,6 +1390,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
      */
     private SubscriptionsStore getSubscriptionsStoreInternal(String 
streamIdentifier) {
         return new ZKSubscriptionsStore(writerZKC,
-                ZKLogMetadataForReader.getSubscribersPath(uri, name, 
streamIdentifier));
+                LogMetadataForReader.getSubscribersPath(uri, name, 
streamIdentifier));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index 4f138f2..caee864 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -23,7 +23,7 @@ import 
com.twitter.distributedlog.callback.LogSegmentNamesListener;
 import com.twitter.distributedlog.exceptions.LogEmptyException;
 import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
+import com.twitter.distributedlog.metadata.LogMetadata;
 import com.twitter.distributedlog.io.AsyncAbortable;
 import com.twitter.distributedlog.io.AsyncCloseable;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
@@ -87,7 +87,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable {
     static final Logger LOG = LoggerFactory.getLogger(BKLogHandler.class);
 
-    protected final ZKLogMetadata logMetadata;
+    protected final LogMetadata logMetadata;
     protected final DistributedLogConfiguration conf;
     protected final BookKeeperClient bookKeeperClient;
     protected final LogStreamMetadataStore streamMetadataStore;
@@ -120,7 +120,7 @@ public abstract class BKLogHandler implements 
AsyncCloseable, AsyncAbortable {
     /**
      * Construct a Bookkeeper journal manager.
      */
-    BKLogHandler(ZKLogMetadata metadata,
+    BKLogHandler(LogMetadata metadata,
                  DistributedLogConfiguration conf,
                  BookKeeperClientBuilder bkcBuilder,
                  LogStreamMetadataStore streamMetadataStore,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
index 1963172..9cfe1a6 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
@@ -35,7 +35,7 @@ import com.twitter.distributedlog.exceptions.LockingException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
+import com.twitter.distributedlog.metadata.LogMetadataForReader;
 import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.lock.DistributedLock;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
@@ -110,7 +110,7 @@ import javax.annotation.Nullable;
 class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener 
{
     static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
 
-    protected final ZKLogMetadataForReader logMetadataForReader;
+    protected final LogMetadataForReader logMetadataForReader;
     protected final ReadAheadCache readAheadCache;
     protected final LedgerHandleCache handleCache;
 
@@ -142,7 +142,7 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
     /**
      * Construct a Bookkeeper journal manager.
      */
-    BKLogReadHandler(ZKLogMetadataForReader logMetadata,
+    BKLogReadHandler(LogMetadataForReader logMetadata,
                      Optional<String> subscriberId,
                      DistributedLogConfiguration conf,
                      DynamicDistributedLogConfiguration dynConf,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
index f2e30ce..2e31ac8 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
@@ -30,7 +30,7 @@ import 
com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.function.GetLastTxIdFunction;
 import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
+import com.twitter.distributedlog.metadata.LogMetadataForWriter;
 import com.twitter.distributedlog.lock.DistributedLock;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
@@ -89,7 +89,7 @@ import static 
com.twitter.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_F
 class BKLogWriteHandler extends BKLogHandler {
     static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
 
-    protected final ZKLogMetadataForWriter logMetadataForWriter;
+    protected final LogMetadataForWriter logMetadataForWriter;
     protected final DistributedLock lock;
     protected final LedgerAllocator ledgerAllocator;
     protected final MaxTxId maxTxId;
@@ -149,7 +149,7 @@ class BKLogWriteHandler extends BKLogHandler {
     /**
      * Construct a Bookkeeper journal manager.
      */
-    BKLogWriteHandler(ZKLogMetadataForWriter logMetadata,
+    BKLogWriteHandler(LogMetadataForWriter logMetadata,
                       DistributedLogConfiguration conf,
                       BookKeeperClientBuilder bkcBuilder,
                       LogStreamMetadataStore streamMetadataStore,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
index 1b831ea..2076dd8 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -25,8 +25,8 @@ import 
com.twitter.distributedlog.callback.LogSegmentNamesListener;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
 import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
+import com.twitter.distributedlog.metadata.LogMetadata;
+import com.twitter.distributedlog.metadata.LogMetadataForWriter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
@@ -222,7 +222,7 @@ public class ZKLogSegmentMetadataStore implements 
LogSegmentMetadataStore, Watch
 
     @Override
     public void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn,
-                                                 ZKLogMetadata logMetadata,
+                                                 LogMetadata logMetadata,
                                                  Versioned<Long> lssn,
                                                  
Transaction.OpListener<Version> listener) {
         Version version = lssn.getVersion();
@@ -236,7 +236,7 @@ public class ZKLogSegmentMetadataStore implements 
LogSegmentMetadataStore, Watch
 
     @Override
     public void storeMaxTxnId(Transaction<Object> txn,
-                              ZKLogMetadataForWriter logMetadata,
+                              LogMetadataForWriter logMetadata,
                               Versioned<Long> transactionId,
                               Transaction.OpListener<Version> listener) {
         Version version = transactionId.getVersion();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java
deleted file mode 100644
index 37beb16..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.metadata;
-
-import java.net.URI;
-
-/**
- * Class to represent the layout and metadata of the zookeeper-based log 
metadata
- */
-public class ZKLogMetadata {
-
-    protected static String getLogComponentPath(URI uri, String logName, 
String logIdentifier, String component) {
-        return String.format("%s/%s/%s%s", uri.getPath(), logName, 
logIdentifier, component);
-    }
-
-    /**
-     * Get the top stream path for a given log.
-     *
-     * @param uri namespace to store the log
-     * @param logName name of the log
-     * @return top stream path
-     */
-    public static String getLogStreamPath(URI uri, String logName) {
-        return String.format("%s/%s", uri.getPath(), logName);
-    }
-
-    /**
-     * Get the log root path for a given log.
-     *
-     * @param uri
-     *          namespace to store the log
-     * @param logName
-     *          name of the log
-     * @param logIdentifier
-     *          identifier of the log
-     * @return log root path
-     */
-    public static String getLogRootPath(URI uri, String logName, String 
logIdentifier) {
-        return getLogComponentPath(uri, logName, logIdentifier, "");
-    }
-
-    /**
-     * Get the logsegments root path for a given log.
-     *
-     * @param uri
-     *          namespace to store the log
-     * @param logName
-     *          name of the log
-     * @param logIdentifier
-     *          identifier of the log
-     * @return logsegments root path
-     */
-    public static String getLogSegmentsPath(URI uri, String logName, String 
logIdentifier) {
-        return getLogComponentPath(uri, logName, logIdentifier, 
LOGSEGMENTS_PATH);
-    }
-
-    protected static final int LAYOUT_VERSION = -1;
-    public final static String LOGSEGMENTS_PATH = "/ledgers";
-    public final static String VERSION_PATH = "/version";
-    // writer znodes
-    public final static String MAX_TXID_PATH = "/maxtxid";
-    public final static String LOCK_PATH = "/lock";
-    public final static String ALLOCATION_PATH = "/allocation";
-    // reader znodes
-    public final static String READ_LOCK_PATH = "/readLock";
-
-    protected final URI uri;
-    protected final String logName;
-    protected final String logIdentifier;
-
-    // Root path of the log
-    protected final String logRootPath;
-    // Components
-    protected final String logSegmentsPath;
-    protected final String lockPath;
-    protected final String maxTxIdPath;
-    protected final String allocationPath;
-
-    /**
-     * metadata representation of a log
-     *
-     * @param uri
-     *          namespace to store the log
-     * @param logName
-     *          name of the log
-     * @param logIdentifier
-     *          identifier of the log
-     */
-    protected ZKLogMetadata(URI uri,
-                            String logName,
-                            String logIdentifier) {
-        this.uri = uri;
-        this.logName = logName;
-        this.logIdentifier = logIdentifier;
-        this.logRootPath = getLogRootPath(uri, logName, logIdentifier);
-        this.logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
-        this.lockPath = logRootPath + LOCK_PATH;
-        this.maxTxIdPath = logRootPath + MAX_TXID_PATH;
-        this.allocationPath = logRootPath + ALLOCATION_PATH;
-    }
-
-    public URI getUri() {
-        return uri;
-    }
-
-    public String getLogName() {
-        return logName;
-    }
-
-    /**
-     * Get the root path of the log.
-     *
-     * @return root path of the log.
-     */
-    public String getLogRootPath() {
-        return logRootPath;
-    }
-
-    /**
-     * Get the root path for log segments.
-     *
-     * @return root path for log segments
-     */
-    public String getLogSegmentsPath() {
-        return this.logSegmentsPath;
-    }
-
-    /**
-     * Get the path for a log segment of the log.
-     *
-     * @param segmentName
-     *          segment name
-     * @return path for the log segment
-     */
-    public String getLogSegmentPath(String segmentName) {
-        return this.logSegmentsPath + "/" + segmentName;
-    }
-
-    public String getLockPath() {
-        return lockPath;
-    }
-
-    public String getMaxTxIdPath() {
-        return maxTxIdPath;
-    }
-
-    public String getAllocationPath() {
-        return allocationPath;
-    }
-
-    /**
-     * Get the fully qualified name of the log.
-     *
-     * @return fully qualified name
-     */
-    public String getFullyQualifiedName() {
-        return String.format("%s:%s", logName, logIdentifier);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForReader.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForReader.java
deleted file mode 100644
index 8858317..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForReader.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.metadata;
-
-import com.google.common.base.Optional;
-
-import java.net.URI;
-
-/**
- * Log Metadata for Reader
- */
-public class ZKLogMetadataForReader extends ZKLogMetadata {
-
-    /**
-     * Get the root path to store subscription infos of a log.
-     *
-     * @param uri
-     *          namespace of the log
-     * @param logName
-     *          name of the log
-     * @param logIdentifier
-     *          identifier of the log
-     * @return subscribers root path
-     */
-    public static String getSubscribersPath(URI uri, String logName, String 
logIdentifier) {
-        return getLogComponentPath(uri, logName, logIdentifier, 
SUBSCRIBERS_PATH);
-    }
-
-    /**
-     * Get the path that stores subscription info for a 
<code>subscriberId</code> for a <code>log</code>.
-     *
-     * @param uri
-     *          namespace of the log
-     * @param logName
-     *          name of the log
-     * @param logIdentifier
-     *          identifier of the log
-     * @param subscriberId
-     *          subscriber id of the log
-     * @return subscriber's path
-     */
-    public static String getSubscriberPath(URI uri, String logName, String 
logIdentifier, String subscriberId) {
-        return String.format("%s/%s", getSubscribersPath(uri, logName, 
logIdentifier), subscriberId);
-    }
-
-    /**
-     * Create a metadata representation of a log for reader.
-     *
-     * @param uri
-     *          namespace to store the log
-     * @param logName
-     *          name of the log
-     * @param logIdentifier
-     *          identifier of the log
-     * @return metadata representation of a log for reader
-     */
-    public static ZKLogMetadataForReader of(URI uri, String logName, String 
logIdentifier) {
-        return new ZKLogMetadataForReader(uri, logName, logIdentifier);
-    }
-
-    final static String SUBSCRIBERS_PATH = "/subscribers";
-
-    /**
-     * metadata representation of a log
-     *
-     * @param uri           namespace to store the log
-     * @param logName       name of the log
-     * @param logIdentifier identifier of the log
-     */
-    private ZKLogMetadataForReader(URI uri, String logName, String 
logIdentifier) {
-        super(uri, logName, logIdentifier);
-    }
-
-    /**
-     * Get the readlock path for the log or a subscriber of the log.
-     *
-     * @param subscriberId
-     *          subscriber id. it is optional.
-     * @return read lock path
-     */
-    public String getReadLockPath(Optional<String> subscriberId) {
-        if (subscriberId.isPresent()) {
-            return logRootPath + SUBSCRIBERS_PATH + "/" + subscriberId.get() + 
READ_LOCK_PATH;
-        } else {
-            return logRootPath + READ_LOCK_PATH;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java
deleted file mode 100644
index 9a1548c..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.metadata;
-
-import org.apache.bookkeeper.versioning.Versioned;
-
-import java.net.URI;
-
-/**
- * Log Metadata for writer
- */
-public class ZKLogMetadataForWriter extends ZKLogMetadata {
-
-    private final Versioned<byte[]> maxLSSNData;
-    private final Versioned<byte[]> maxTxIdData;
-    private final Versioned<byte[]> allocationData;
-
-    /**
-     * metadata representation of a log
-     *
-     * @param uri           namespace to store the log
-     * @param logName       name of the log
-     * @param logIdentifier identifier of the log
-     */
-    public ZKLogMetadataForWriter(URI uri,
-                                  String logName,
-                                  String logIdentifier,
-                                  Versioned<byte[]> maxLSSNData,
-                                  Versioned<byte[]> maxTxIdData,
-                                  Versioned<byte[]> allocationData) {
-        super(uri, logName, logIdentifier);
-        this.maxLSSNData = maxLSSNData;
-        this.maxTxIdData = maxTxIdData;
-        this.allocationData = allocationData;
-    }
-
-    public Versioned<byte[]> getMaxLSSNData() {
-        return maxLSSNData;
-    }
-
-    public Versioned<byte[]> getMaxTxIdData() {
-        return maxTxIdData;
-    }
-
-    public Versioned<byte[]> getAllocationData() {
-        return allocationData;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index d89dddb..c76a5a5 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -38,6 +38,9 @@ import com.twitter.distributedlog.lock.ZKDistributedLock;
 import com.twitter.distributedlog.lock.ZKSessionLockFactory;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
+import com.twitter.distributedlog.metadata.LogMetadata;
+import com.twitter.distributedlog.metadata.LogMetadataForReader;
+import com.twitter.distributedlog.metadata.LogMetadataForWriter;
 import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.SchedulerUtils;
@@ -75,7 +78,7 @@ import java.net.URI;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*;
+import static com.twitter.distributedlog.metadata.LogMetadata.*;
 
 /**
  * zookeeper based {@link LogStreamMetadataStore}
@@ -172,7 +175,7 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
 
     @Override
     public Future<Void> logExists(URI uri, final String logName) {
-        final String logSegmentsPath = ZKLogMetadata.getLogSegmentsPath(
+        final String logSegmentsPath = LogMetadata.getLogSegmentsPath(
                 uri, logName, conf.getUnpartitionedStreamName());
         final Promise<Void> promise = new Promise<Void>();
         try {
@@ -221,7 +224,7 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
     //
 
     @Override
-    public DistributedLock createWriteLock(ZKLogMetadataForWriter metadata) {
+    public DistributedLock createWriteLock(LogMetadataForWriter metadata) {
         return new ZKDistributedLock(
                 getLockStateExecutor(true),
                 getLockFactory(true),
@@ -234,7 +237,7 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
     // Create Read Lock
     //
 
-    private Future<Void> ensureReadLockPathExist(final ZKLogMetadata 
logMetadata,
+    private Future<Void> ensureReadLockPathExist(final LogMetadata logMetadata,
                                                  final String readLockPath) {
         final Promise<Void> promise = new Promise<Void>();
         promise.setInterruptHandler(new com.twitter.util.Function<Throwable, 
BoxedUnit>() {
@@ -274,7 +277,7 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
     }
 
     @Override
-    public Future<DistributedLock> createReadLock(final ZKLogMetadataForReader 
metadata,
+    public Future<DistributedLock> createReadLock(final LogMetadataForReader 
metadata,
                                                   Optional<String> readerId) {
         final String readLockPath = metadata.getReadLockPath(readerId);
         return ensureReadLockPathExist(metadata, readLockPath).flatMap(
@@ -492,11 +495,11 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
         }, null);
     }
 
-    static ZKLogMetadataForWriter processLogMetadatas(URI uri,
-                                                      String logName,
-                                                      String logIdentifier,
-                                                      List<Versioned<byte[]>> 
metadatas,
-                                                      boolean ownAllocator)
+    static LogMetadataForWriter processLogMetadatas(URI uri,
+                                                    String logName,
+                                                    String logIdentifier,
+                                                    List<Versioned<byte[]>> 
metadatas,
+                                                    boolean ownAllocator)
             throws UnexpectedException {
         try {
             // max id
@@ -526,7 +529,7 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
             } else {
                 allocationData = new Versioned<byte[]>(null, null);
             }
-            return new ZKLogMetadataForWriter(uri, logName, logIdentifier,
+            return new LogMetadataForWriter(uri, logName, logIdentifier,
                     maxLSSNData, maxTxnIdData, allocationData);
         } catch (IllegalArgumentException iae) {
             throw new UnexpectedException("Invalid log " + logName, iae);
@@ -535,13 +538,13 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
         }
     }
 
-    static Future<ZKLogMetadataForWriter> getLog(final URI uri,
-                                                 final String logName,
-                                                 final String logIdentifier,
-                                                 final ZooKeeperClient 
zooKeeperClient,
-                                                 final boolean ownAllocator,
-                                                 final boolean 
createIfNotExists) {
-        final String logRootPath = ZKLogMetadata.getLogRootPath(uri, logName, 
logIdentifier);
+    static Future<LogMetadataForWriter> getLog(final URI uri,
+                                               final String logName,
+                                               final String logIdentifier,
+                                               final ZooKeeperClient 
zooKeeperClient,
+                                               final boolean ownAllocator,
+                                               final boolean 
createIfNotExists) {
+        final String logRootPath = LogMetadata.getLogRootPath(uri, logName, 
logIdentifier);
         try {
             PathUtils.validatePath(logRootPath);
         } catch (IllegalArgumentException e) {
@@ -561,9 +564,9 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
                                     ownAllocator, createIfNotExists, promise);
                             return promise;
                         }
-                    }).map(new ExceptionalFunction<List<Versioned<byte[]>>, 
ZKLogMetadataForWriter>() {
+                    }).map(new ExceptionalFunction<List<Versioned<byte[]>>, 
LogMetadataForWriter>() {
                         @Override
-                        public ZKLogMetadataForWriter 
applyE(List<Versioned<byte[]>> metadatas) throws DLException {
+                        public LogMetadataForWriter 
applyE(List<Versioned<byte[]>> metadatas) throws DLException {
                             return processLogMetadatas(
                                     uri,
                                     logName,
@@ -581,10 +584,10 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
     }
 
     @Override
-    public Future<ZKLogMetadataForWriter> getLog(final URI uri,
-                                                 final String logName,
-                                                 final boolean ownAllocator,
-                                                 final boolean 
createIfNotExists) {
+    public Future<LogMetadataForWriter> getLog(final URI uri,
+                                               final String logName,
+                                               final boolean ownAllocator,
+                                               final boolean 
createIfNotExists) {
         return getLog(
                 uri,
                 logName,
@@ -602,7 +605,7 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
     public Future<Void> deleteLog(URI uri, final String logName) {
         final Promise<Void> promise = new Promise<Void>();
         try {
-            String streamPath = ZKLogMetadata.getLogStreamPath(uri, logName);
+            String streamPath = LogMetadata.getLogStreamPath(uri, logName);
             ZKUtil.deleteRecursive(zooKeeperClient.get(), streamPath, new 
AsyncCallback.VoidCallback() {
                 @Override
                 public void processResult(int rc, String path, Object ctx) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
index 5144634..dda76e5 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
@@ -20,8 +20,8 @@ package com.twitter.distributedlog.logsegment;
 import com.google.common.annotations.Beta;
 import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.callback.LogSegmentNamesListener;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
+import com.twitter.distributedlog.metadata.LogMetadata;
+import com.twitter.distributedlog.metadata.LogMetadataForWriter;
 import com.twitter.distributedlog.util.Transaction;
 import com.twitter.distributedlog.util.Transaction.OpListener;
 import com.twitter.util.Future;
@@ -62,7 +62,7 @@ public interface LogSegmentMetadataStore extends Closeable {
      *          listener on the result to this operation
      */
     void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn,
-                                          ZKLogMetadata logMetadata,
+                                          LogMetadata logMetadata,
                                           Versioned<Long> sequenceNumber,
                                           OpListener<Version> listener);
 
@@ -79,7 +79,7 @@ public interface LogSegmentMetadataStore extends Closeable {
      *          listener on the result to this operation
      */
     void storeMaxTxnId(Transaction<Object> txn,
-                       ZKLogMetadataForWriter logMetadata,
+                       LogMetadataForWriter logMetadata,
                        Versioned<Long> transactionId,
                        OpListener<Version> listener);
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java
new file mode 100644
index 0000000..c878d68
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.metadata;
+
+import java.net.URI;
+
+/**
+ * Class to represent the layout and metadata of the zookeeper-based log 
metadata
+ */
+public class LogMetadata {
+
+    protected static String getLogComponentPath(URI uri, String logName, 
String logIdentifier, String component) {
+        return String.format("%s/%s/%s%s", uri.getPath(), logName, 
logIdentifier, component);
+    }
+
+    /**
+     * Get the top stream path for a given log.
+     *
+     * @param uri namespace to store the log
+     * @param logName name of the log
+     * @return top stream path
+     */
+    public static String getLogStreamPath(URI uri, String logName) {
+        return String.format("%s/%s", uri.getPath(), logName);
+    }
+
+    /**
+     * Get the log root path for a given log.
+     *
+     * @param uri
+     *          namespace to store the log
+     * @param logName
+     *          name of the log
+     * @param logIdentifier
+     *          identifier of the log
+     * @return log root path
+     */
+    public static String getLogRootPath(URI uri, String logName, String 
logIdentifier) {
+        return getLogComponentPath(uri, logName, logIdentifier, "");
+    }
+
+    /**
+     * Get the logsegments root path for a given log.
+     *
+     * @param uri
+     *          namespace to store the log
+     * @param logName
+     *          name of the log
+     * @param logIdentifier
+     *          identifier of the log
+     * @return logsegments root path
+     */
+    public static String getLogSegmentsPath(URI uri, String logName, String 
logIdentifier) {
+        return getLogComponentPath(uri, logName, logIdentifier, 
LOGSEGMENTS_PATH);
+    }
+
+    public static final int LAYOUT_VERSION = -1;
+    public final static String LOGSEGMENTS_PATH = "/ledgers";
+    public final static String VERSION_PATH = "/version";
+    // writer znodes
+    public final static String MAX_TXID_PATH = "/maxtxid";
+    public final static String LOCK_PATH = "/lock";
+    public final static String ALLOCATION_PATH = "/allocation";
+    // reader znodes
+    public final static String READ_LOCK_PATH = "/readLock";
+
+    protected final URI uri;
+    protected final String logName;
+    protected final String logIdentifier;
+
+    // Root path of the log
+    protected final String logRootPath;
+    // Components
+    protected final String logSegmentsPath;
+    protected final String lockPath;
+    protected final String maxTxIdPath;
+    protected final String allocationPath;
+
+    /**
+     * metadata representation of a log
+     *
+     * @param uri
+     *          namespace to store the log
+     * @param logName
+     *          name of the log
+     * @param logIdentifier
+     *          identifier of the log
+     */
+    protected LogMetadata(URI uri,
+                          String logName,
+                          String logIdentifier) {
+        this.uri = uri;
+        this.logName = logName;
+        this.logIdentifier = logIdentifier;
+        this.logRootPath = getLogRootPath(uri, logName, logIdentifier);
+        this.logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
+        this.lockPath = logRootPath + LOCK_PATH;
+        this.maxTxIdPath = logRootPath + MAX_TXID_PATH;
+        this.allocationPath = logRootPath + ALLOCATION_PATH;
+    }
+
+    public URI getUri() {
+        return uri;
+    }
+
+    public String getLogName() {
+        return logName;
+    }
+
+    /**
+     * Get the root path of the log.
+     *
+     * @return root path of the log.
+     */
+    public String getLogRootPath() {
+        return logRootPath;
+    }
+
+    /**
+     * Get the root path for log segments.
+     *
+     * @return root path for log segments
+     */
+    public String getLogSegmentsPath() {
+        return this.logSegmentsPath;
+    }
+
+    /**
+     * Get the path for a log segment of the log.
+     *
+     * @param segmentName
+     *          segment name
+     * @return path for the log segment
+     */
+    public String getLogSegmentPath(String segmentName) {
+        return this.logSegmentsPath + "/" + segmentName;
+    }
+
+    public String getLockPath() {
+        return lockPath;
+    }
+
+    public String getMaxTxIdPath() {
+        return maxTxIdPath;
+    }
+
+    public String getAllocationPath() {
+        return allocationPath;
+    }
+
+    /**
+     * Get the fully qualified name of the log.
+     *
+     * @return fully qualified name
+     */
+    public String getFullyQualifiedName() {
+        return String.format("%s:%s", logName, logIdentifier);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java
new file mode 100644
index 0000000..ff6bfca
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.metadata;
+
+import com.google.common.base.Optional;
+
+import java.net.URI;
+
+/**
+ * Log Metadata for Reader
+ */
+public class LogMetadataForReader extends LogMetadata {
+
+    /**
+     * Get the root path to store subscription infos of a log.
+     *
+     * @param uri
+     *          namespace of the log
+     * @param logName
+     *          name of the log
+     * @param logIdentifier
+     *          identifier of the log
+     * @return subscribers root path
+     */
+    public static String getSubscribersPath(URI uri, String logName, String 
logIdentifier) {
+        return getLogComponentPath(uri, logName, logIdentifier, 
SUBSCRIBERS_PATH);
+    }
+
+    /**
+     * Get the path that stores subscription info for a 
<code>subscriberId</code> for a <code>log</code>.
+     *
+     * @param uri
+     *          namespace of the log
+     * @param logName
+     *          name of the log
+     * @param logIdentifier
+     *          identifier of the log
+     * @param subscriberId
+     *          subscriber id of the log
+     * @return subscriber's path
+     */
+    public static String getSubscriberPath(URI uri, String logName, String 
logIdentifier, String subscriberId) {
+        return String.format("%s/%s", getSubscribersPath(uri, logName, 
logIdentifier), subscriberId);
+    }
+
+    /**
+     * Create a metadata representation of a log for reader.
+     *
+     * @param uri
+     *          namespace to store the log
+     * @param logName
+     *          name of the log
+     * @param logIdentifier
+     *          identifier of the log
+     * @return metadata representation of a log for reader
+     */
+    public static LogMetadataForReader of(URI uri, String logName, String 
logIdentifier) {
+        return new LogMetadataForReader(uri, logName, logIdentifier);
+    }
+
+    final static String SUBSCRIBERS_PATH = "/subscribers";
+
+    /**
+     * metadata representation of a log
+     *
+     * @param uri           namespace to store the log
+     * @param logName       name of the log
+     * @param logIdentifier identifier of the log
+     */
+    private LogMetadataForReader(URI uri, String logName, String 
logIdentifier) {
+        super(uri, logName, logIdentifier);
+    }
+
+    /**
+     * Get the readlock path for the log or a subscriber of the log.
+     *
+     * @param subscriberId
+     *          subscriber id. it is optional.
+     * @return read lock path
+     */
+    public String getReadLockPath(Optional<String> subscriberId) {
+        if (subscriberId.isPresent()) {
+            return logRootPath + SUBSCRIBERS_PATH + "/" + subscriberId.get() + 
READ_LOCK_PATH;
+        } else {
+            return logRootPath + READ_LOCK_PATH;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java
new file mode 100644
index 0000000..2284cbb
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.metadata;
+
+import org.apache.bookkeeper.versioning.Versioned;
+
+import java.net.URI;
+
+/**
+ * Log Metadata for writer
+ */
+public class LogMetadataForWriter extends LogMetadata {
+
+    private final Versioned<byte[]> maxLSSNData;
+    private final Versioned<byte[]> maxTxIdData;
+    private final Versioned<byte[]> allocationData;
+
+    /**
+     * metadata representation of a log
+     *
+     * @param uri           namespace to store the log
+     * @param logName       name of the log
+     * @param logIdentifier identifier of the log
+     */
+    public LogMetadataForWriter(URI uri,
+                                String logName,
+                                String logIdentifier,
+                                Versioned<byte[]> maxLSSNData,
+                                Versioned<byte[]> maxTxIdData,
+                                Versioned<byte[]> allocationData) {
+        super(uri, logName, logIdentifier);
+        this.maxLSSNData = maxLSSNData;
+        this.maxTxIdData = maxTxIdData;
+        this.allocationData = allocationData;
+    }
+
+    public Versioned<byte[]> getMaxLSSNData() {
+        return maxLSSNData;
+    }
+
+    public Versioned<byte[]> getMaxTxIdData() {
+        return maxTxIdData;
+    }
+
+    public Versioned<byte[]> getAllocationData() {
+        return allocationData;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java
index db7812e..7242a5e 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java
@@ -19,8 +19,6 @@ package com.twitter.distributedlog.metadata;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Optional;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
 import com.twitter.distributedlog.lock.DistributedLock;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.util.PermitManager;
@@ -61,7 +59,7 @@ public interface LogStreamMetadataStore extends Closeable {
      * @param readerId the reader id used for lock
      * @return the read lock
      */
-    Future<DistributedLock> createReadLock(ZKLogMetadataForReader metadata,
+    Future<DistributedLock> createReadLock(LogMetadataForReader metadata,
                                            Optional<String> readerId);
 
     /**
@@ -70,7 +68,7 @@ public interface LogStreamMetadataStore extends Closeable {
      * @param metadata the metadata for a log stream
      * @return the write lock
      */
-    DistributedLock createWriteLock(ZKLogMetadataForWriter metadata);
+    DistributedLock createWriteLock(LogMetadataForWriter metadata);
 
     /**
      * Create the metadata of a log.
@@ -81,10 +79,10 @@ public interface LogStreamMetadataStore extends Closeable {
      * @param createIfNotExists flag to create the stream if it doesn't exist
      * @return the metadata of the log
      */
-    Future<ZKLogMetadataForWriter> getLog(URI uri,
-                                          String streamName,
-                                          boolean ownAllocator,
-                                          boolean createIfNotExists);
+    Future<LogMetadataForWriter> getLog(URI uri,
+                                        String streamName,
+                                        boolean ownAllocator,
+                                        boolean createIfNotExists);
 
     /**
      * Delete the metadata of a log.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
index 0217560..6c55014 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
@@ -36,8 +36,7 @@ import com.twitter.distributedlog.callback.ReadAheadCallback;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
+import com.twitter.distributedlog.metadata.LogMetadataForReader;
 import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.io.AsyncCloseable;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
@@ -108,7 +107,7 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, AsyncClosea
     private final String fullyQualifiedName;
     private final DistributedLogConfiguration conf;
     private final DynamicDistributedLogConfiguration dynConf;
-    private final ZKLogMetadataForReader logMetadata;
+    private final LogMetadataForReader logMetadata;
     private final BKLogHandler bkLedgerManager;
     private final boolean isHandleForReading;
     // Notification to notify readahead status
@@ -203,7 +202,7 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, AsyncClosea
 
     public ReadAheadWorker(DistributedLogConfiguration conf,
                            DynamicDistributedLogConfiguration dynConf,
-                           ZKLogMetadataForReader logMetadata,
+                           LogMetadataForReader logMetadata,
                            BKLogHandler ledgerManager,
                            OrderedScheduler scheduler,
                            LedgerHandleCache handleCache,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
index c8a1c74..96c33e2 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
@@ -50,8 +50,7 @@ import 
com.twitter.distributedlog.exceptions.InvalidStreamNameException;
 import com.twitter.distributedlog.exceptions.LogRecordTooLongException;
 import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
 import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
-import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.metadata.LogMetadata;
 import com.twitter.distributedlog.metadata.MetadataUpdater;
 import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
@@ -1107,7 +1106,7 @@ public class TestBKDistributedLogManager extends 
TestDistributedLogBase {
         }
 
         Map<Long, LogSegmentMetadata> segmentList = 
DLMTestUtil.readLogSegments(zookeeperClient,
-                ZKLogMetadata.getLogSegmentsPath(uri, name, 
confLocal.getUnpartitionedStreamName()));
+                LogMetadata.getLogSegmentsPath(uri, name, 
confLocal.getUnpartitionedStreamName()));
 
         LOG.info("Read segments before truncating first segment : {}", 
segmentList);
 
@@ -1116,7 +1115,7 @@ public class TestBKDistributedLogManager extends 
TestDistributedLogBase {
         
FutureUtils.result(updater.setLogSegmentTruncated(segmentList.get(1L)));
 
         segmentList = DLMTestUtil.readLogSegments(zookeeperClient,
-                ZKLogMetadata.getLogSegmentsPath(uri, name, 
confLocal.getUnpartitionedStreamName()));
+                LogMetadata.getLogSegmentsPath(uri, name, 
confLocal.getUnpartitionedStreamName()));
 
         LOG.info("Read segments after truncated first segment : {}", 
segmentList);
 
@@ -1139,7 +1138,7 @@ public class TestBKDistributedLogManager extends 
TestDistributedLogBase {
         FutureUtils.result(updater.setLogSegmentActive(segmentList.get(1L)));
 
         segmentList = DLMTestUtil.readLogSegments(zookeeperClient,
-                ZKLogMetadata.getLogSegmentsPath(uri, name, 
confLocal.getUnpartitionedStreamName()));
+                LogMetadata.getLogSegmentsPath(uri, name, 
confLocal.getUnpartitionedStreamName()));
 
         LOG.info("Read segments after marked first segment as active : {}", 
segmentList);
 
@@ -1147,7 +1146,7 @@ public class TestBKDistributedLogManager extends 
TestDistributedLogBase {
         
FutureUtils.result(updater.setLogSegmentTruncated(segmentList.get(2L)));
 
         segmentList = DLMTestUtil.readLogSegments(zookeeperClient,
-                ZKLogMetadata.getLogSegmentsPath(uri, name, 
confLocal.getUnpartitionedStreamName()));
+                LogMetadata.getLogSegmentsPath(uri, name, 
confLocal.getUnpartitionedStreamName()));
 
         LOG.info("Read segments after truncated second segment : {}", 
segmentList);
 
@@ -1190,7 +1189,7 @@ public class TestBKDistributedLogManager extends 
TestDistributedLogBase {
         }
 
         segmentList = DLMTestUtil.readLogSegments(zookeeperClient,
-                ZKLogMetadata.getLogSegmentsPath(uri, name, 
conf.getUnpartitionedStreamName()));
+                LogMetadata.getLogSegmentsPath(uri, name, 
conf.getUnpartitionedStreamName()));
 
         
Assert.assertTrue(segmentList.get(truncDLSN.getLogSegmentSequenceNo()).getMinActiveDLSN().compareTo(truncDLSN)
 == 0);
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java
index b579b5e..bb67214 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java
@@ -19,7 +19,7 @@ package com.twitter.distributedlog;
 
 import com.twitter.distributedlog.exceptions.DLIllegalStateException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
+import com.twitter.distributedlog.metadata.LogMetadata;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
 import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
 import com.twitter.distributedlog.util.DLUtils;
@@ -46,7 +46,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase 
{
     private static MaxLogSegmentSequenceNo 
getMaxLogSegmentSequenceNo(ZooKeeperClient zkc, URI uri, String streamName,
                                                                       
DistributedLogConfiguration conf) throws Exception {
         Stat stat = new Stat();
-        String logSegmentsPath = ZKLogMetadata.getLogSegmentsPath(
+        String logSegmentsPath = LogMetadata.getLogSegmentsPath(
                 uri, streamName, conf.getUnpartitionedStreamName());
         byte[] data = zkc.get().getData(logSegmentsPath, false, stat);
         Versioned<byte[]> maxLSSNData = new Versioned<byte[]>(data, new 
ZkVersion(stat.getVersion()));
@@ -55,7 +55,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase 
{
 
     private static void updateMaxLogSegmentSequenceNo(ZooKeeperClient zkc, URI 
uri, String streamName,
                                                       
DistributedLogConfiguration conf, byte[] data) throws Exception {
-        String logSegmentsPath = ZKLogMetadata.getLogSegmentsPath(
+        String logSegmentsPath = LogMetadata.getLogSegmentsPath(
                 uri, streamName, conf.getUnpartitionedStreamName());
         zkc.get().setData(logSegmentsPath, data, -1);
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
index 2a8c83b..46e8af0 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
@@ -27,8 +27,8 @@ import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.ZooKeeperClientUtils;
 import com.twitter.distributedlog.callback.LogSegmentNamesListener;
 import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
-import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
+import com.twitter.distributedlog.metadata.LogMetadata;
+import com.twitter.distributedlog.metadata.LogMetadataForWriter;
 import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
@@ -637,7 +637,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
         final Promise<Version> result = new Promise<Version>();
-        ZKLogMetadata metadata = mock(ZKLogMetadata.class);
+        LogMetadata metadata = mock(LogMetadata.class);
         when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
         lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
@@ -664,7 +664,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
         final Promise<Version> result = new Promise<Version>();
-        ZKLogMetadata metadata = mock(ZKLogMetadata.class);
+        LogMetadata metadata = mock(LogMetadata.class);
         when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
         lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
@@ -702,7 +702,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
         final Promise<Version> result = new Promise<Version>();
         String nonExistentPath = rootZkPath + "/non-existent";
-        ZKLogMetadata metadata = mock(ZKLogMetadata.class);
+        LogMetadata metadata = mock(LogMetadata.class);
         when(metadata.getLogSegmentsPath()).thenReturn(nonExistentPath);
         lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
@@ -735,7 +735,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
         final Promise<Version> result = new Promise<Version>();
-        ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class);
+        LogMetadataForWriter metadata = mock(LogMetadataForWriter.class);
         when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
         lsmStore.storeMaxTxnId(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
@@ -762,7 +762,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
         final Promise<Version> result = new Promise<Version>();
-        ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class);
+        LogMetadataForWriter metadata = mock(LogMetadataForWriter.class);
         when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
         lsmStore.storeMaxTxnId(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
@@ -800,7 +800,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
         final Promise<Version> result = new Promise<Version>();
         String nonExistentPath = rootZkPath + "/non-existent";
-        ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class);
+        LogMetadataForWriter metadata = mock(LogMetadataForWriter.class);
         when(metadata.getMaxTxIdPath()).thenReturn(nonExistentPath);
         lsmStore.storeMaxTxnId(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadata.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadata.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadata.java
deleted file mode 100644
index 85aad62..0000000
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadata.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.metadata;
-
-import com.twitter.distributedlog.DLMTestUtil;
-import org.junit.Test;
-
-import java.net.URI;
-
-import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*;
-import static org.junit.Assert.*;
-
-public class TestZKLogMetadata {
-
-    @Test(timeout = 60000)
-    public void testGetPaths() throws Exception {
-        String rootPath = "/test-get-paths";
-        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
-        String logName = "test-log";
-        String logIdentifier = "<default>";
-        String logRootPath = uri.getPath() + "/" + logName + "/" + 
logIdentifier;
-        String logSegmentName = "test-segment";
-
-        ZKLogMetadata logMetadata = new ZKLogMetadata(uri, logName, 
logIdentifier);
-        assertEquals("wrong log name", logName, logMetadata.getLogName());
-        assertEquals("wrong root path", logRootPath, 
logMetadata.getLogRootPath());
-        assertEquals("wrong log segments path",
-                logRootPath + LOGSEGMENTS_PATH,
-                logMetadata.getLogSegmentsPath());
-        assertEquals("wrong log segment path",
-                logRootPath + LOGSEGMENTS_PATH + "/" + logSegmentName,
-                logMetadata.getLogSegmentPath(logSegmentName));
-        assertEquals("wrong lock path",
-                logRootPath + LOCK_PATH, logMetadata.getLockPath());
-        assertEquals("wrong max tx id path",
-                logRootPath + MAX_TXID_PATH, logMetadata.getMaxTxIdPath());
-        assertEquals("wrong allocation path",
-                logRootPath + ALLOCATION_PATH, 
logMetadata.getAllocationPath());
-        assertEquals("wrong qualified name",
-                logName + ":" + logIdentifier, 
logMetadata.getFullyQualifiedName());
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
index 9a08aa0..41544d6 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
@@ -23,6 +23,7 @@ import com.twitter.distributedlog.metadata.DLMetadata;
 import com.google.common.collect.Lists;
 import com.twitter.distributedlog.DLMTestUtil;
 import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.metadata.LogMetadataForWriter;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
 import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
 import com.twitter.distributedlog.DistributedLogManager;
@@ -51,7 +52,7 @@ import org.slf4j.LoggerFactory;
 import java.net.URI;
 import java.util.List;
 
-import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*;
+import static com.twitter.distributedlog.metadata.LogMetadata.*;
 import static 
com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*;
 import static org.junit.Assert.*;
 
@@ -167,7 +168,7 @@ public class TestZKLogStreamMetadataStore extends 
ZooKeeperClusterTestCase {
             zkc.get().delete(path, -1);
         }
 
-        ZKLogMetadataForWriter logMetadata =
+        LogMetadataForWriter logMetadata =
                 FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, 
ownAllocator, true));
 
         final String logRootPath = getLogRootPath(uri, logName, logIdentifier);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
index f14a217..9f29ebe 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
@@ -20,6 +20,8 @@ package com.twitter.distributedlog.impl.metadata;
 import com.google.common.collect.Lists;
 import com.twitter.distributedlog.DLMTestUtil;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.metadata.LogMetadata;
+import com.twitter.distributedlog.metadata.LogMetadataForWriter;
 import com.twitter.distributedlog.util.DLUtils;
 import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.versioning.Versioned;
@@ -88,7 +90,7 @@ public class TestZKLogStreamMetadataStoreUtils {
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new 
ZkVersion(1)),
-                new 
Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), 
null),
                 new Versioned<byte[]>(null, null));
         processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
     }
@@ -104,7 +106,7 @@ public class TestZKLogStreamMetadataStoreUtils {
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new 
ZkVersion(1)),
-                new 
Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), 
null),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 new Versioned<byte[]>(null, null));
         processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
@@ -121,7 +123,7 @@ public class TestZKLogStreamMetadataStoreUtils {
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new 
ZkVersion(1)),
-                new 
Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), 
null),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 new Versioned<byte[]>(null, null));
@@ -139,7 +141,7 @@ public class TestZKLogStreamMetadataStoreUtils {
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new 
ZkVersion(1)),
-                new 
Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), 
null),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 new 
Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new 
ZkVersion(1)),
@@ -162,11 +164,11 @@ public class TestZKLogStreamMetadataStoreUtils {
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 maxTxnIdData,
-                new 
Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), 
null),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 logSegmentsData);
-        ZKLogMetadataForWriter metadata =
+        LogMetadataForWriter metadata =
                 processLogMetadatas(uri, logName, logIdentifier, metadatas, 
false);
         assertTrue(maxTxnIdData == metadata.getMaxTxIdData());
         assertTrue(logSegmentsData == metadata.getMaxLSSNData());
@@ -191,12 +193,12 @@ public class TestZKLogStreamMetadataStoreUtils {
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 maxTxnIdData,
-                new 
Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), 
null),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 logSegmentsData,
                 allocationData);
-        ZKLogMetadataForWriter metadata =
+        LogMetadataForWriter metadata =
                 processLogMetadatas(uri, logName, logIdentifier, metadatas, 
true);
         assertTrue(maxTxnIdData == metadata.getMaxTxIdData());
         assertTrue(logSegmentsData == metadata.getMaxLSSNData());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestLogMetadata.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestLogMetadata.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestLogMetadata.java
new file mode 100644
index 0000000..315b983
--- /dev/null
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestLogMetadata.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.metadata;
+
+import com.twitter.distributedlog.DLMTestUtil;
+import org.junit.Test;
+
+import java.net.URI;
+
+import static com.twitter.distributedlog.metadata.LogMetadata.*;
+import static org.junit.Assert.*;
+
+public class TestLogMetadata {
+
+    @Test(timeout = 60000)
+    public void testGetPaths() throws Exception {
+        String rootPath = "/test-get-paths";
+        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+        String logName = "test-log";
+        String logIdentifier = "<default>";
+        String logRootPath = uri.getPath() + "/" + logName + "/" + 
logIdentifier;
+        String logSegmentName = "test-segment";
+
+        LogMetadata logMetadata = new LogMetadata(uri, logName, logIdentifier);
+        assertEquals("wrong log name", logName, logMetadata.getLogName());
+        assertEquals("wrong root path", logRootPath, 
logMetadata.getLogRootPath());
+        assertEquals("wrong log segments path",
+                logRootPath + LOGSEGMENTS_PATH,
+                logMetadata.getLogSegmentsPath());
+        assertEquals("wrong log segment path",
+                logRootPath + LOGSEGMENTS_PATH + "/" + logSegmentName,
+                logMetadata.getLogSegmentPath(logSegmentName));
+        assertEquals("wrong lock path",
+                logRootPath + LOCK_PATH, logMetadata.getLockPath());
+        assertEquals("wrong max tx id path",
+                logRootPath + MAX_TXID_PATH, logMetadata.getMaxTxIdPath());
+        assertEquals("wrong allocation path",
+                logRootPath + ALLOCATION_PATH, 
logMetadata.getAllocationPath());
+        assertEquals("wrong qualified name",
+                logName + ":" + logIdentifier, 
logMetadata.getFullyQualifiedName());
+    }
+
+
+}

Reply via email to