DL-118: Stream metadata store followup - rename ZKLogMetadata* to LogMetadata*
As the followup change, this change is to remove 'ZK' from 'ZKLogMetadata*' class Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/dc4548be Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/dc4548be Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/dc4548be Branch: refs/heads/master Commit: dc4548bebcd8c8ce680fb14cfaf203dd6605fa29 Parents: b91d49a Author: Sijie Guo <sij...@twitter.com> Authored: Wed Nov 30 18:17:46 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Thu Dec 29 02:09:00 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/BKDistributedLogManager.java | 18 +- .../twitter/distributedlog/BKLogHandler.java | 6 +- .../distributedlog/BKLogReadHandler.java | 6 +- .../distributedlog/BKLogWriteHandler.java | 6 +- .../impl/ZKLogSegmentMetadataStore.java | 8 +- .../impl/metadata/ZKLogMetadata.java | 175 ------------------- .../impl/metadata/ZKLogMetadataForReader.java | 103 ----------- .../impl/metadata/ZKLogMetadataForWriter.java | 64 ------- .../impl/metadata/ZKLogStreamMetadataStore.java | 53 +++--- .../logsegment/LogSegmentMetadataStore.java | 8 +- .../distributedlog/metadata/LogMetadata.java | 175 +++++++++++++++++++ .../metadata/LogMetadataForReader.java | 103 +++++++++++ .../metadata/LogMetadataForWriter.java | 64 +++++++ .../metadata/LogStreamMetadataStore.java | 14 +- .../readahead/ReadAheadWorker.java | 7 +- .../TestBKDistributedLogManager.java | 13 +- .../distributedlog/TestLogSegmentsZK.java | 6 +- .../impl/TestZKLogSegmentMetadataStore.java | 16 +- .../impl/metadata/TestZKLogMetadata.java | 59 ------- .../metadata/TestZKLogStreamMetadataStore.java | 5 +- .../TestZKLogStreamMetadataStoreUtils.java | 18 +- .../metadata/TestLogMetadata.java | 59 +++++++ 22 files changed, 494 insertions(+), 492 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java index 0a34caa..cd3f359 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java @@ -34,8 +34,8 @@ import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.UnexpectedException; import com.twitter.distributedlog.function.CloseAsyncCloseableFunction; import com.twitter.distributedlog.function.GetVersionedValueFunction; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter; +import com.twitter.distributedlog.metadata.LogMetadataForReader; +import com.twitter.distributedlog.metadata.LogMetadataForWriter; import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore; import com.twitter.distributedlog.io.AsyncCloseable; import com.twitter.distributedlog.lock.DistributedLock; @@ -501,7 +501,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL AsyncNotification notification, boolean deserializeRecordSet, boolean isHandleForReading) { - ZKLogMetadataForReader logMetadata = ZKLogMetadataForReader.of(uri, name, streamIdentifier); + LogMetadataForReader logMetadata = LogMetadataForReader.of(uri, name, streamIdentifier); return new BKLogReadHandler( logMetadata, subscriberId, @@ -524,7 +524,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL // Create Ledger Allocator - LedgerAllocator createLedgerAllocator(ZKLogMetadataForWriter logMetadata) throws IOException { + LedgerAllocator createLedgerAllocator(LogMetadataForWriter logMetadata) throws IOException { LedgerAllocator ledgerAllocatorDelegator; if (!dynConf.getEnableLedgerAllocatorPool()) { QuorumConfigProvider quorumConfigProvider = @@ -558,9 +558,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL name, ownAllocator, conf.getCreateStreamIfNotExists() || ownAllocator - ).flatMap(new AbstractFunction1<ZKLogMetadataForWriter, Future<BKLogWriteHandler>>() { + ).flatMap(new AbstractFunction1<LogMetadataForWriter, Future<BKLogWriteHandler>>() { @Override - public Future<BKLogWriteHandler> apply(ZKLogMetadataForWriter logMetadata) { + public Future<BKLogWriteHandler> apply(LogMetadataForWriter logMetadata) { Promise<BKLogWriteHandler> createPromise = new Promise<BKLogWriteHandler>(); createWriteHandler(logMetadata, lockHandler, createPromise); return createPromise; @@ -568,7 +568,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL }); } - private void createWriteHandler(ZKLogMetadataForWriter logMetadata, + private void createWriteHandler(LogMetadataForWriter logMetadata, boolean lockHandler, final Promise<BKLogWriteHandler> createPromise) { // Build the locks @@ -1374,7 +1374,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL */ private SubscriptionStateStore getSubscriptionStateStoreInternal(String streamIdentifier, String subscriberId) { return new ZKSubscriptionStateStore(writerZKC, - ZKLogMetadataForReader.getSubscriberPath(uri, name, streamIdentifier, subscriberId)); + LogMetadataForReader.getSubscriberPath(uri, name, streamIdentifier, subscriberId)); } @Override @@ -1390,6 +1390,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL */ private SubscriptionsStore getSubscriptionsStoreInternal(String streamIdentifier) { return new ZKSubscriptionsStore(writerZKC, - ZKLogMetadataForReader.getSubscribersPath(uri, name, streamIdentifier)); + LogMetadataForReader.getSubscribersPath(uri, name, streamIdentifier)); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java index 4f138f2..caee864 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java @@ -23,7 +23,7 @@ import com.twitter.distributedlog.callback.LogSegmentNamesListener; import com.twitter.distributedlog.exceptions.LogEmptyException; import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadata; +import com.twitter.distributedlog.metadata.LogMetadata; import com.twitter.distributedlog.io.AsyncAbortable; import com.twitter.distributedlog.io.AsyncCloseable; import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; @@ -87,7 +87,7 @@ import java.util.concurrent.atomic.AtomicReference; public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { static final Logger LOG = LoggerFactory.getLogger(BKLogHandler.class); - protected final ZKLogMetadata logMetadata; + protected final LogMetadata logMetadata; protected final DistributedLogConfiguration conf; protected final BookKeeperClient bookKeeperClient; protected final LogStreamMetadataStore streamMetadataStore; @@ -120,7 +120,7 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { /** * Construct a Bookkeeper journal manager. */ - BKLogHandler(ZKLogMetadata metadata, + BKLogHandler(LogMetadata metadata, DistributedLogConfiguration conf, BookKeeperClientBuilder bkcBuilder, LogStreamMetadataStore streamMetadataStore, http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java index 1963172..9cfe1a6 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java @@ -35,7 +35,7 @@ import com.twitter.distributedlog.exceptions.LockingException; import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader; +import com.twitter.distributedlog.metadata.LogMetadataForReader; import com.twitter.distributedlog.injector.AsyncFailureInjector; import com.twitter.distributedlog.lock.DistributedLock; import com.twitter.distributedlog.logsegment.LogSegmentFilter; @@ -110,7 +110,7 @@ import javax.annotation.Nullable; class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class); - protected final ZKLogMetadataForReader logMetadataForReader; + protected final LogMetadataForReader logMetadataForReader; protected final ReadAheadCache readAheadCache; protected final LedgerHandleCache handleCache; @@ -142,7 +142,7 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { /** * Construct a Bookkeeper journal manager. */ - BKLogReadHandler(ZKLogMetadataForReader logMetadata, + BKLogReadHandler(LogMetadataForReader logMetadata, Optional<String> subscriberId, DistributedLogConfiguration conf, DynamicDistributedLogConfiguration dynConf, http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java index f2e30ce..2e31ac8 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java @@ -30,7 +30,7 @@ import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException; import com.twitter.distributedlog.exceptions.UnexpectedException; import com.twitter.distributedlog.function.GetLastTxIdFunction; import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter; +import com.twitter.distributedlog.metadata.LogMetadataForWriter; import com.twitter.distributedlog.lock.DistributedLock; import com.twitter.distributedlog.logsegment.LogSegmentFilter; import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; @@ -89,7 +89,7 @@ import static com.twitter.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_F class BKLogWriteHandler extends BKLogHandler { static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class); - protected final ZKLogMetadataForWriter logMetadataForWriter; + protected final LogMetadataForWriter logMetadataForWriter; protected final DistributedLock lock; protected final LedgerAllocator ledgerAllocator; protected final MaxTxId maxTxId; @@ -149,7 +149,7 @@ class BKLogWriteHandler extends BKLogHandler { /** * Construct a Bookkeeper journal manager. */ - BKLogWriteHandler(ZKLogMetadataForWriter logMetadata, + BKLogWriteHandler(LogMetadataForWriter logMetadata, DistributedLogConfiguration conf, BookKeeperClientBuilder bkcBuilder, LogStreamMetadataStore streamMetadataStore, http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java index 1b831ea..2076dd8 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java @@ -25,8 +25,8 @@ import com.twitter.distributedlog.callback.LogSegmentNamesListener; import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadata; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter; +import com.twitter.distributedlog.metadata.LogMetadata; +import com.twitter.distributedlog.metadata.LogMetadataForWriter; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.util.DLUtils; import com.twitter.distributedlog.util.FutureUtils; @@ -222,7 +222,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch @Override public void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn, - ZKLogMetadata logMetadata, + LogMetadata logMetadata, Versioned<Long> lssn, Transaction.OpListener<Version> listener) { Version version = lssn.getVersion(); @@ -236,7 +236,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch @Override public void storeMaxTxnId(Transaction<Object> txn, - ZKLogMetadataForWriter logMetadata, + LogMetadataForWriter logMetadata, Versioned<Long> transactionId, Transaction.OpListener<Version> listener) { Version version = transactionId.getVersion(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java deleted file mode 100644 index 37beb16..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.metadata; - -import java.net.URI; - -/** - * Class to represent the layout and metadata of the zookeeper-based log metadata - */ -public class ZKLogMetadata { - - protected static String getLogComponentPath(URI uri, String logName, String logIdentifier, String component) { - return String.format("%s/%s/%s%s", uri.getPath(), logName, logIdentifier, component); - } - - /** - * Get the top stream path for a given log. - * - * @param uri namespace to store the log - * @param logName name of the log - * @return top stream path - */ - public static String getLogStreamPath(URI uri, String logName) { - return String.format("%s/%s", uri.getPath(), logName); - } - - /** - * Get the log root path for a given log. - * - * @param uri - * namespace to store the log - * @param logName - * name of the log - * @param logIdentifier - * identifier of the log - * @return log root path - */ - public static String getLogRootPath(URI uri, String logName, String logIdentifier) { - return getLogComponentPath(uri, logName, logIdentifier, ""); - } - - /** - * Get the logsegments root path for a given log. - * - * @param uri - * namespace to store the log - * @param logName - * name of the log - * @param logIdentifier - * identifier of the log - * @return logsegments root path - */ - public static String getLogSegmentsPath(URI uri, String logName, String logIdentifier) { - return getLogComponentPath(uri, logName, logIdentifier, LOGSEGMENTS_PATH); - } - - protected static final int LAYOUT_VERSION = -1; - public final static String LOGSEGMENTS_PATH = "/ledgers"; - public final static String VERSION_PATH = "/version"; - // writer znodes - public final static String MAX_TXID_PATH = "/maxtxid"; - public final static String LOCK_PATH = "/lock"; - public final static String ALLOCATION_PATH = "/allocation"; - // reader znodes - public final static String READ_LOCK_PATH = "/readLock"; - - protected final URI uri; - protected final String logName; - protected final String logIdentifier; - - // Root path of the log - protected final String logRootPath; - // Components - protected final String logSegmentsPath; - protected final String lockPath; - protected final String maxTxIdPath; - protected final String allocationPath; - - /** - * metadata representation of a log - * - * @param uri - * namespace to store the log - * @param logName - * name of the log - * @param logIdentifier - * identifier of the log - */ - protected ZKLogMetadata(URI uri, - String logName, - String logIdentifier) { - this.uri = uri; - this.logName = logName; - this.logIdentifier = logIdentifier; - this.logRootPath = getLogRootPath(uri, logName, logIdentifier); - this.logSegmentsPath = logRootPath + LOGSEGMENTS_PATH; - this.lockPath = logRootPath + LOCK_PATH; - this.maxTxIdPath = logRootPath + MAX_TXID_PATH; - this.allocationPath = logRootPath + ALLOCATION_PATH; - } - - public URI getUri() { - return uri; - } - - public String getLogName() { - return logName; - } - - /** - * Get the root path of the log. - * - * @return root path of the log. - */ - public String getLogRootPath() { - return logRootPath; - } - - /** - * Get the root path for log segments. - * - * @return root path for log segments - */ - public String getLogSegmentsPath() { - return this.logSegmentsPath; - } - - /** - * Get the path for a log segment of the log. - * - * @param segmentName - * segment name - * @return path for the log segment - */ - public String getLogSegmentPath(String segmentName) { - return this.logSegmentsPath + "/" + segmentName; - } - - public String getLockPath() { - return lockPath; - } - - public String getMaxTxIdPath() { - return maxTxIdPath; - } - - public String getAllocationPath() { - return allocationPath; - } - - /** - * Get the fully qualified name of the log. - * - * @return fully qualified name - */ - public String getFullyQualifiedName() { - return String.format("%s:%s", logName, logIdentifier); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForReader.java deleted file mode 100644 index 8858317..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForReader.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.metadata; - -import com.google.common.base.Optional; - -import java.net.URI; - -/** - * Log Metadata for Reader - */ -public class ZKLogMetadataForReader extends ZKLogMetadata { - - /** - * Get the root path to store subscription infos of a log. - * - * @param uri - * namespace of the log - * @param logName - * name of the log - * @param logIdentifier - * identifier of the log - * @return subscribers root path - */ - public static String getSubscribersPath(URI uri, String logName, String logIdentifier) { - return getLogComponentPath(uri, logName, logIdentifier, SUBSCRIBERS_PATH); - } - - /** - * Get the path that stores subscription info for a <code>subscriberId</code> for a <code>log</code>. - * - * @param uri - * namespace of the log - * @param logName - * name of the log - * @param logIdentifier - * identifier of the log - * @param subscriberId - * subscriber id of the log - * @return subscriber's path - */ - public static String getSubscriberPath(URI uri, String logName, String logIdentifier, String subscriberId) { - return String.format("%s/%s", getSubscribersPath(uri, logName, logIdentifier), subscriberId); - } - - /** - * Create a metadata representation of a log for reader. - * - * @param uri - * namespace to store the log - * @param logName - * name of the log - * @param logIdentifier - * identifier of the log - * @return metadata representation of a log for reader - */ - public static ZKLogMetadataForReader of(URI uri, String logName, String logIdentifier) { - return new ZKLogMetadataForReader(uri, logName, logIdentifier); - } - - final static String SUBSCRIBERS_PATH = "/subscribers"; - - /** - * metadata representation of a log - * - * @param uri namespace to store the log - * @param logName name of the log - * @param logIdentifier identifier of the log - */ - private ZKLogMetadataForReader(URI uri, String logName, String logIdentifier) { - super(uri, logName, logIdentifier); - } - - /** - * Get the readlock path for the log or a subscriber of the log. - * - * @param subscriberId - * subscriber id. it is optional. - * @return read lock path - */ - public String getReadLockPath(Optional<String> subscriberId) { - if (subscriberId.isPresent()) { - return logRootPath + SUBSCRIBERS_PATH + "/" + subscriberId.get() + READ_LOCK_PATH; - } else { - return logRootPath + READ_LOCK_PATH; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java deleted file mode 100644 index 9a1548c..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.metadata; - -import org.apache.bookkeeper.versioning.Versioned; - -import java.net.URI; - -/** - * Log Metadata for writer - */ -public class ZKLogMetadataForWriter extends ZKLogMetadata { - - private final Versioned<byte[]> maxLSSNData; - private final Versioned<byte[]> maxTxIdData; - private final Versioned<byte[]> allocationData; - - /** - * metadata representation of a log - * - * @param uri namespace to store the log - * @param logName name of the log - * @param logIdentifier identifier of the log - */ - public ZKLogMetadataForWriter(URI uri, - String logName, - String logIdentifier, - Versioned<byte[]> maxLSSNData, - Versioned<byte[]> maxTxIdData, - Versioned<byte[]> allocationData) { - super(uri, logName, logIdentifier); - this.maxLSSNData = maxLSSNData; - this.maxTxIdData = maxTxIdData; - this.allocationData = allocationData; - } - - public Versioned<byte[]> getMaxLSSNData() { - return maxLSSNData; - } - - public Versioned<byte[]> getMaxTxIdData() { - return maxTxIdData; - } - - public Versioned<byte[]> getAllocationData() { - return allocationData; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java index d89dddb..c76a5a5 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java @@ -38,6 +38,9 @@ import com.twitter.distributedlog.lock.ZKDistributedLock; import com.twitter.distributedlog.lock.ZKSessionLockFactory; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.metadata.LogStreamMetadataStore; +import com.twitter.distributedlog.metadata.LogMetadata; +import com.twitter.distributedlog.metadata.LogMetadataForReader; +import com.twitter.distributedlog.metadata.LogMetadataForWriter; import com.twitter.distributedlog.util.DLUtils; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.SchedulerUtils; @@ -75,7 +78,7 @@ import java.net.URI; import java.util.List; import java.util.concurrent.TimeUnit; -import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*; +import static com.twitter.distributedlog.metadata.LogMetadata.*; /** * zookeeper based {@link LogStreamMetadataStore} @@ -172,7 +175,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { @Override public Future<Void> logExists(URI uri, final String logName) { - final String logSegmentsPath = ZKLogMetadata.getLogSegmentsPath( + final String logSegmentsPath = LogMetadata.getLogSegmentsPath( uri, logName, conf.getUnpartitionedStreamName()); final Promise<Void> promise = new Promise<Void>(); try { @@ -221,7 +224,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { // @Override - public DistributedLock createWriteLock(ZKLogMetadataForWriter metadata) { + public DistributedLock createWriteLock(LogMetadataForWriter metadata) { return new ZKDistributedLock( getLockStateExecutor(true), getLockFactory(true), @@ -234,7 +237,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { // Create Read Lock // - private Future<Void> ensureReadLockPathExist(final ZKLogMetadata logMetadata, + private Future<Void> ensureReadLockPathExist(final LogMetadata logMetadata, final String readLockPath) { final Promise<Void> promise = new Promise<Void>(); promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() { @@ -274,7 +277,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { } @Override - public Future<DistributedLock> createReadLock(final ZKLogMetadataForReader metadata, + public Future<DistributedLock> createReadLock(final LogMetadataForReader metadata, Optional<String> readerId) { final String readLockPath = metadata.getReadLockPath(readerId); return ensureReadLockPathExist(metadata, readLockPath).flatMap( @@ -492,11 +495,11 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { }, null); } - static ZKLogMetadataForWriter processLogMetadatas(URI uri, - String logName, - String logIdentifier, - List<Versioned<byte[]>> metadatas, - boolean ownAllocator) + static LogMetadataForWriter processLogMetadatas(URI uri, + String logName, + String logIdentifier, + List<Versioned<byte[]>> metadatas, + boolean ownAllocator) throws UnexpectedException { try { // max id @@ -526,7 +529,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { } else { allocationData = new Versioned<byte[]>(null, null); } - return new ZKLogMetadataForWriter(uri, logName, logIdentifier, + return new LogMetadataForWriter(uri, logName, logIdentifier, maxLSSNData, maxTxnIdData, allocationData); } catch (IllegalArgumentException iae) { throw new UnexpectedException("Invalid log " + logName, iae); @@ -535,13 +538,13 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { } } - static Future<ZKLogMetadataForWriter> getLog(final URI uri, - final String logName, - final String logIdentifier, - final ZooKeeperClient zooKeeperClient, - final boolean ownAllocator, - final boolean createIfNotExists) { - final String logRootPath = ZKLogMetadata.getLogRootPath(uri, logName, logIdentifier); + static Future<LogMetadataForWriter> getLog(final URI uri, + final String logName, + final String logIdentifier, + final ZooKeeperClient zooKeeperClient, + final boolean ownAllocator, + final boolean createIfNotExists) { + final String logRootPath = LogMetadata.getLogRootPath(uri, logName, logIdentifier); try { PathUtils.validatePath(logRootPath); } catch (IllegalArgumentException e) { @@ -561,9 +564,9 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { ownAllocator, createIfNotExists, promise); return promise; } - }).map(new ExceptionalFunction<List<Versioned<byte[]>>, ZKLogMetadataForWriter>() { + }).map(new ExceptionalFunction<List<Versioned<byte[]>>, LogMetadataForWriter>() { @Override - public ZKLogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException { + public LogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException { return processLogMetadatas( uri, logName, @@ -581,10 +584,10 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { } @Override - public Future<ZKLogMetadataForWriter> getLog(final URI uri, - final String logName, - final boolean ownAllocator, - final boolean createIfNotExists) { + public Future<LogMetadataForWriter> getLog(final URI uri, + final String logName, + final boolean ownAllocator, + final boolean createIfNotExists) { return getLog( uri, logName, @@ -602,7 +605,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { public Future<Void> deleteLog(URI uri, final String logName) { final Promise<Void> promise = new Promise<Void>(); try { - String streamPath = ZKLogMetadata.getLogStreamPath(uri, logName); + String streamPath = LogMetadata.getLogStreamPath(uri, logName); ZKUtil.deleteRecursive(zooKeeperClient.get(), streamPath, new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java index 5144634..dda76e5 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java @@ -20,8 +20,8 @@ package com.twitter.distributedlog.logsegment; import com.google.common.annotations.Beta; import com.twitter.distributedlog.LogSegmentMetadata; import com.twitter.distributedlog.callback.LogSegmentNamesListener; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadata; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter; +import com.twitter.distributedlog.metadata.LogMetadata; +import com.twitter.distributedlog.metadata.LogMetadataForWriter; import com.twitter.distributedlog.util.Transaction; import com.twitter.distributedlog.util.Transaction.OpListener; import com.twitter.util.Future; @@ -62,7 +62,7 @@ public interface LogSegmentMetadataStore extends Closeable { * listener on the result to this operation */ void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn, - ZKLogMetadata logMetadata, + LogMetadata logMetadata, Versioned<Long> sequenceNumber, OpListener<Version> listener); @@ -79,7 +79,7 @@ public interface LogSegmentMetadataStore extends Closeable { * listener on the result to this operation */ void storeMaxTxnId(Transaction<Object> txn, - ZKLogMetadataForWriter logMetadata, + LogMetadataForWriter logMetadata, Versioned<Long> transactionId, OpListener<Version> listener); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java new file mode 100644 index 0000000..c878d68 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.metadata; + +import java.net.URI; + +/** + * Class to represent the layout and metadata of the zookeeper-based log metadata + */ +public class LogMetadata { + + protected static String getLogComponentPath(URI uri, String logName, String logIdentifier, String component) { + return String.format("%s/%s/%s%s", uri.getPath(), logName, logIdentifier, component); + } + + /** + * Get the top stream path for a given log. + * + * @param uri namespace to store the log + * @param logName name of the log + * @return top stream path + */ + public static String getLogStreamPath(URI uri, String logName) { + return String.format("%s/%s", uri.getPath(), logName); + } + + /** + * Get the log root path for a given log. + * + * @param uri + * namespace to store the log + * @param logName + * name of the log + * @param logIdentifier + * identifier of the log + * @return log root path + */ + public static String getLogRootPath(URI uri, String logName, String logIdentifier) { + return getLogComponentPath(uri, logName, logIdentifier, ""); + } + + /** + * Get the logsegments root path for a given log. + * + * @param uri + * namespace to store the log + * @param logName + * name of the log + * @param logIdentifier + * identifier of the log + * @return logsegments root path + */ + public static String getLogSegmentsPath(URI uri, String logName, String logIdentifier) { + return getLogComponentPath(uri, logName, logIdentifier, LOGSEGMENTS_PATH); + } + + public static final int LAYOUT_VERSION = -1; + public final static String LOGSEGMENTS_PATH = "/ledgers"; + public final static String VERSION_PATH = "/version"; + // writer znodes + public final static String MAX_TXID_PATH = "/maxtxid"; + public final static String LOCK_PATH = "/lock"; + public final static String ALLOCATION_PATH = "/allocation"; + // reader znodes + public final static String READ_LOCK_PATH = "/readLock"; + + protected final URI uri; + protected final String logName; + protected final String logIdentifier; + + // Root path of the log + protected final String logRootPath; + // Components + protected final String logSegmentsPath; + protected final String lockPath; + protected final String maxTxIdPath; + protected final String allocationPath; + + /** + * metadata representation of a log + * + * @param uri + * namespace to store the log + * @param logName + * name of the log + * @param logIdentifier + * identifier of the log + */ + protected LogMetadata(URI uri, + String logName, + String logIdentifier) { + this.uri = uri; + this.logName = logName; + this.logIdentifier = logIdentifier; + this.logRootPath = getLogRootPath(uri, logName, logIdentifier); + this.logSegmentsPath = logRootPath + LOGSEGMENTS_PATH; + this.lockPath = logRootPath + LOCK_PATH; + this.maxTxIdPath = logRootPath + MAX_TXID_PATH; + this.allocationPath = logRootPath + ALLOCATION_PATH; + } + + public URI getUri() { + return uri; + } + + public String getLogName() { + return logName; + } + + /** + * Get the root path of the log. + * + * @return root path of the log. + */ + public String getLogRootPath() { + return logRootPath; + } + + /** + * Get the root path for log segments. + * + * @return root path for log segments + */ + public String getLogSegmentsPath() { + return this.logSegmentsPath; + } + + /** + * Get the path for a log segment of the log. + * + * @param segmentName + * segment name + * @return path for the log segment + */ + public String getLogSegmentPath(String segmentName) { + return this.logSegmentsPath + "/" + segmentName; + } + + public String getLockPath() { + return lockPath; + } + + public String getMaxTxIdPath() { + return maxTxIdPath; + } + + public String getAllocationPath() { + return allocationPath; + } + + /** + * Get the fully qualified name of the log. + * + * @return fully qualified name + */ + public String getFullyQualifiedName() { + return String.format("%s:%s", logName, logIdentifier); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java new file mode 100644 index 0000000..ff6bfca --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.metadata; + +import com.google.common.base.Optional; + +import java.net.URI; + +/** + * Log Metadata for Reader + */ +public class LogMetadataForReader extends LogMetadata { + + /** + * Get the root path to store subscription infos of a log. + * + * @param uri + * namespace of the log + * @param logName + * name of the log + * @param logIdentifier + * identifier of the log + * @return subscribers root path + */ + public static String getSubscribersPath(URI uri, String logName, String logIdentifier) { + return getLogComponentPath(uri, logName, logIdentifier, SUBSCRIBERS_PATH); + } + + /** + * Get the path that stores subscription info for a <code>subscriberId</code> for a <code>log</code>. + * + * @param uri + * namespace of the log + * @param logName + * name of the log + * @param logIdentifier + * identifier of the log + * @param subscriberId + * subscriber id of the log + * @return subscriber's path + */ + public static String getSubscriberPath(URI uri, String logName, String logIdentifier, String subscriberId) { + return String.format("%s/%s", getSubscribersPath(uri, logName, logIdentifier), subscriberId); + } + + /** + * Create a metadata representation of a log for reader. + * + * @param uri + * namespace to store the log + * @param logName + * name of the log + * @param logIdentifier + * identifier of the log + * @return metadata representation of a log for reader + */ + public static LogMetadataForReader of(URI uri, String logName, String logIdentifier) { + return new LogMetadataForReader(uri, logName, logIdentifier); + } + + final static String SUBSCRIBERS_PATH = "/subscribers"; + + /** + * metadata representation of a log + * + * @param uri namespace to store the log + * @param logName name of the log + * @param logIdentifier identifier of the log + */ + private LogMetadataForReader(URI uri, String logName, String logIdentifier) { + super(uri, logName, logIdentifier); + } + + /** + * Get the readlock path for the log or a subscriber of the log. + * + * @param subscriberId + * subscriber id. it is optional. + * @return read lock path + */ + public String getReadLockPath(Optional<String> subscriberId) { + if (subscriberId.isPresent()) { + return logRootPath + SUBSCRIBERS_PATH + "/" + subscriberId.get() + READ_LOCK_PATH; + } else { + return logRootPath + READ_LOCK_PATH; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java new file mode 100644 index 0000000..2284cbb --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.metadata; + +import org.apache.bookkeeper.versioning.Versioned; + +import java.net.URI; + +/** + * Log Metadata for writer + */ +public class LogMetadataForWriter extends LogMetadata { + + private final Versioned<byte[]> maxLSSNData; + private final Versioned<byte[]> maxTxIdData; + private final Versioned<byte[]> allocationData; + + /** + * metadata representation of a log + * + * @param uri namespace to store the log + * @param logName name of the log + * @param logIdentifier identifier of the log + */ + public LogMetadataForWriter(URI uri, + String logName, + String logIdentifier, + Versioned<byte[]> maxLSSNData, + Versioned<byte[]> maxTxIdData, + Versioned<byte[]> allocationData) { + super(uri, logName, logIdentifier); + this.maxLSSNData = maxLSSNData; + this.maxTxIdData = maxTxIdData; + this.allocationData = allocationData; + } + + public Versioned<byte[]> getMaxLSSNData() { + return maxLSSNData; + } + + public Versioned<byte[]> getMaxTxIdData() { + return maxTxIdData; + } + + public Versioned<byte[]> getAllocationData() { + return allocationData; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java index db7812e..7242a5e 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java @@ -19,8 +19,6 @@ package com.twitter.distributedlog.metadata; import com.google.common.annotations.Beta; import com.google.common.base.Optional; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter; import com.twitter.distributedlog.lock.DistributedLock; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.util.PermitManager; @@ -61,7 +59,7 @@ public interface LogStreamMetadataStore extends Closeable { * @param readerId the reader id used for lock * @return the read lock */ - Future<DistributedLock> createReadLock(ZKLogMetadataForReader metadata, + Future<DistributedLock> createReadLock(LogMetadataForReader metadata, Optional<String> readerId); /** @@ -70,7 +68,7 @@ public interface LogStreamMetadataStore extends Closeable { * @param metadata the metadata for a log stream * @return the write lock */ - DistributedLock createWriteLock(ZKLogMetadataForWriter metadata); + DistributedLock createWriteLock(LogMetadataForWriter metadata); /** * Create the metadata of a log. @@ -81,10 +79,10 @@ public interface LogStreamMetadataStore extends Closeable { * @param createIfNotExists flag to create the stream if it doesn't exist * @return the metadata of the log */ - Future<ZKLogMetadataForWriter> getLog(URI uri, - String streamName, - boolean ownAllocator, - boolean createIfNotExists); + Future<LogMetadataForWriter> getLog(URI uri, + String streamName, + boolean ownAllocator, + boolean createIfNotExists); /** * Delete the metadata of a log. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java index 0217560..6c55014 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java @@ -36,8 +36,7 @@ import com.twitter.distributedlog.callback.ReadAheadCallback; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.DLInterruptedException; import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader; +import com.twitter.distributedlog.metadata.LogMetadataForReader; import com.twitter.distributedlog.injector.AsyncFailureInjector; import com.twitter.distributedlog.io.AsyncCloseable; import com.twitter.distributedlog.logsegment.LogSegmentFilter; @@ -108,7 +107,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea private final String fullyQualifiedName; private final DistributedLogConfiguration conf; private final DynamicDistributedLogConfiguration dynConf; - private final ZKLogMetadataForReader logMetadata; + private final LogMetadataForReader logMetadata; private final BKLogHandler bkLedgerManager; private final boolean isHandleForReading; // Notification to notify readahead status @@ -203,7 +202,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea public ReadAheadWorker(DistributedLogConfiguration conf, DynamicDistributedLogConfiguration dynConf, - ZKLogMetadataForReader logMetadata, + LogMetadataForReader logMetadata, BKLogHandler ledgerManager, OrderedScheduler scheduler, LedgerHandleCache handleCache, http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java index c8a1c74..96c33e2 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java @@ -50,8 +50,7 @@ import com.twitter.distributedlog.exceptions.InvalidStreamNameException; import com.twitter.distributedlog.exceptions.LogRecordTooLongException; import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadata; -import com.twitter.distributedlog.metadata.BKDLConfig; +import com.twitter.distributedlog.metadata.LogMetadata; import com.twitter.distributedlog.metadata.MetadataUpdater; import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater; import com.twitter.distributedlog.namespace.DistributedLogNamespace; @@ -1107,7 +1106,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { } Map<Long, LogSegmentMetadata> segmentList = DLMTestUtil.readLogSegments(zookeeperClient, - ZKLogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); + LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); LOG.info("Read segments before truncating first segment : {}", segmentList); @@ -1116,7 +1115,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { FutureUtils.result(updater.setLogSegmentTruncated(segmentList.get(1L))); segmentList = DLMTestUtil.readLogSegments(zookeeperClient, - ZKLogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); + LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); LOG.info("Read segments after truncated first segment : {}", segmentList); @@ -1139,7 +1138,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { FutureUtils.result(updater.setLogSegmentActive(segmentList.get(1L))); segmentList = DLMTestUtil.readLogSegments(zookeeperClient, - ZKLogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); + LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); LOG.info("Read segments after marked first segment as active : {}", segmentList); @@ -1147,7 +1146,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { FutureUtils.result(updater.setLogSegmentTruncated(segmentList.get(2L))); segmentList = DLMTestUtil.readLogSegments(zookeeperClient, - ZKLogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); + LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); LOG.info("Read segments after truncated second segment : {}", segmentList); @@ -1190,7 +1189,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { } segmentList = DLMTestUtil.readLogSegments(zookeeperClient, - ZKLogMetadata.getLogSegmentsPath(uri, name, conf.getUnpartitionedStreamName())); + LogMetadata.getLogSegmentsPath(uri, name, conf.getUnpartitionedStreamName())); Assert.assertTrue(segmentList.get(truncDLSN.getLogSegmentSequenceNo()).getMinActiveDLSN().compareTo(truncDLSN) == 0); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java index b579b5e..bb67214 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java @@ -19,7 +19,7 @@ package com.twitter.distributedlog; import com.twitter.distributedlog.exceptions.DLIllegalStateException; import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadata; +import com.twitter.distributedlog.metadata.LogMetadata; import com.twitter.distributedlog.namespace.DistributedLogNamespace; import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; import com.twitter.distributedlog.util.DLUtils; @@ -46,7 +46,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase { private static MaxLogSegmentSequenceNo getMaxLogSegmentSequenceNo(ZooKeeperClient zkc, URI uri, String streamName, DistributedLogConfiguration conf) throws Exception { Stat stat = new Stat(); - String logSegmentsPath = ZKLogMetadata.getLogSegmentsPath( + String logSegmentsPath = LogMetadata.getLogSegmentsPath( uri, streamName, conf.getUnpartitionedStreamName()); byte[] data = zkc.get().getData(logSegmentsPath, false, stat); Versioned<byte[]> maxLSSNData = new Versioned<byte[]>(data, new ZkVersion(stat.getVersion())); @@ -55,7 +55,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase { private static void updateMaxLogSegmentSequenceNo(ZooKeeperClient zkc, URI uri, String streamName, DistributedLogConfiguration conf, byte[] data) throws Exception { - String logSegmentsPath = ZKLogMetadata.getLogSegmentsPath( + String logSegmentsPath = LogMetadata.getLogSegmentsPath( uri, streamName, conf.getUnpartitionedStreamName()); zkc.get().setData(logSegmentsPath, data, -1); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java index 2a8c83b..46e8af0 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java @@ -27,8 +27,8 @@ import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.ZooKeeperClientUtils; import com.twitter.distributedlog.callback.LogSegmentNamesListener; import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadata; -import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter; +import com.twitter.distributedlog.metadata.LogMetadata; +import com.twitter.distributedlog.metadata.LogMetadataForWriter; import com.twitter.distributedlog.util.DLUtils; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; @@ -637,7 +637,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> updateTxn = lsmStore.transaction(); Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0)); final Promise<Version> result = new Promise<Version>(); - ZKLogMetadata metadata = mock(ZKLogMetadata.class); + LogMetadata metadata = mock(LogMetadata.class); when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath); lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value, new Transaction.OpListener<Version>() { @@ -664,7 +664,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> updateTxn = lsmStore.transaction(); Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10)); final Promise<Version> result = new Promise<Version>(); - ZKLogMetadata metadata = mock(ZKLogMetadata.class); + LogMetadata metadata = mock(LogMetadata.class); when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath); lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value, new Transaction.OpListener<Version>() { @@ -702,7 +702,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10)); final Promise<Version> result = new Promise<Version>(); String nonExistentPath = rootZkPath + "/non-existent"; - ZKLogMetadata metadata = mock(ZKLogMetadata.class); + LogMetadata metadata = mock(LogMetadata.class); when(metadata.getLogSegmentsPath()).thenReturn(nonExistentPath); lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value, new Transaction.OpListener<Version>() { @@ -735,7 +735,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> updateTxn = lsmStore.transaction(); Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0)); final Promise<Version> result = new Promise<Version>(); - ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class); + LogMetadataForWriter metadata = mock(LogMetadataForWriter.class); when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath); lsmStore.storeMaxTxnId(updateTxn, metadata, value, new Transaction.OpListener<Version>() { @@ -762,7 +762,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> updateTxn = lsmStore.transaction(); Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10)); final Promise<Version> result = new Promise<Version>(); - ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class); + LogMetadataForWriter metadata = mock(LogMetadataForWriter.class); when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath); lsmStore.storeMaxTxnId(updateTxn, metadata, value, new Transaction.OpListener<Version>() { @@ -800,7 +800,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10)); final Promise<Version> result = new Promise<Version>(); String nonExistentPath = rootZkPath + "/non-existent"; - ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class); + LogMetadataForWriter metadata = mock(LogMetadataForWriter.class); when(metadata.getMaxTxIdPath()).thenReturn(nonExistentPath); lsmStore.storeMaxTxnId(updateTxn, metadata, value, new Transaction.OpListener<Version>() { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadata.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadata.java deleted file mode 100644 index 85aad62..0000000 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadata.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.metadata; - -import com.twitter.distributedlog.DLMTestUtil; -import org.junit.Test; - -import java.net.URI; - -import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*; -import static org.junit.Assert.*; - -public class TestZKLogMetadata { - - @Test(timeout = 60000) - public void testGetPaths() throws Exception { - String rootPath = "/test-get-paths"; - URI uri = DLMTestUtil.createDLMURI(2181, rootPath); - String logName = "test-log"; - String logIdentifier = "<default>"; - String logRootPath = uri.getPath() + "/" + logName + "/" + logIdentifier; - String logSegmentName = "test-segment"; - - ZKLogMetadata logMetadata = new ZKLogMetadata(uri, logName, logIdentifier); - assertEquals("wrong log name", logName, logMetadata.getLogName()); - assertEquals("wrong root path", logRootPath, logMetadata.getLogRootPath()); - assertEquals("wrong log segments path", - logRootPath + LOGSEGMENTS_PATH, - logMetadata.getLogSegmentsPath()); - assertEquals("wrong log segment path", - logRootPath + LOGSEGMENTS_PATH + "/" + logSegmentName, - logMetadata.getLogSegmentPath(logSegmentName)); - assertEquals("wrong lock path", - logRootPath + LOCK_PATH, logMetadata.getLockPath()); - assertEquals("wrong max tx id path", - logRootPath + MAX_TXID_PATH, logMetadata.getMaxTxIdPath()); - assertEquals("wrong allocation path", - logRootPath + ALLOCATION_PATH, logMetadata.getAllocationPath()); - assertEquals("wrong qualified name", - logName + ":" + logIdentifier, logMetadata.getFullyQualifiedName()); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java index 9a08aa0..41544d6 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java @@ -23,6 +23,7 @@ import com.twitter.distributedlog.metadata.DLMetadata; import com.google.common.collect.Lists; import com.twitter.distributedlog.DLMTestUtil; import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.metadata.LogMetadataForWriter; import com.twitter.distributedlog.namespace.DistributedLogNamespace; import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; import com.twitter.distributedlog.DistributedLogManager; @@ -51,7 +52,7 @@ import org.slf4j.LoggerFactory; import java.net.URI; import java.util.List; -import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*; +import static com.twitter.distributedlog.metadata.LogMetadata.*; import static com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*; import static org.junit.Assert.*; @@ -167,7 +168,7 @@ public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase { zkc.get().delete(path, -1); } - ZKLogMetadataForWriter logMetadata = + LogMetadataForWriter logMetadata = FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, ownAllocator, true)); final String logRootPath = getLogRootPath(uri, logName, logIdentifier); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java index f14a217..9f29ebe 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java @@ -20,6 +20,8 @@ package com.twitter.distributedlog.impl.metadata; import com.google.common.collect.Lists; import com.twitter.distributedlog.DLMTestUtil; import com.twitter.distributedlog.exceptions.UnexpectedException; +import com.twitter.distributedlog.metadata.LogMetadata; +import com.twitter.distributedlog.metadata.LogMetadataForWriter; import com.twitter.distributedlog.util.DLUtils; import org.apache.bookkeeper.meta.ZkVersion; import org.apache.bookkeeper.versioning.Versioned; @@ -88,7 +90,7 @@ public class TestZKLogStreamMetadataStoreUtils { new Versioned<byte[]>(null, null), new Versioned<byte[]>(null, null), new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), - new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), + new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), null), new Versioned<byte[]>(null, null)); processLogMetadatas(uri, logName, logIdentifier, metadatas, false); } @@ -104,7 +106,7 @@ public class TestZKLogStreamMetadataStoreUtils { new Versioned<byte[]>(null, null), new Versioned<byte[]>(null, null), new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), - new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), + new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), null), new Versioned<byte[]>(new byte[0], new ZkVersion(1)), new Versioned<byte[]>(null, null)); processLogMetadatas(uri, logName, logIdentifier, metadatas, false); @@ -121,7 +123,7 @@ public class TestZKLogStreamMetadataStoreUtils { new Versioned<byte[]>(null, null), new Versioned<byte[]>(null, null), new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), - new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), + new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), null), new Versioned<byte[]>(new byte[0], new ZkVersion(1)), new Versioned<byte[]>(new byte[0], new ZkVersion(1)), new Versioned<byte[]>(null, null)); @@ -139,7 +141,7 @@ public class TestZKLogStreamMetadataStoreUtils { new Versioned<byte[]>(null, null), new Versioned<byte[]>(null, null), new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), - new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), + new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), null), new Versioned<byte[]>(new byte[0], new ZkVersion(1)), new Versioned<byte[]>(new byte[0], new ZkVersion(1)), new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1)), @@ -162,11 +164,11 @@ public class TestZKLogStreamMetadataStoreUtils { new Versioned<byte[]>(null, null), new Versioned<byte[]>(null, null), maxTxnIdData, - new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), + new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), null), new Versioned<byte[]>(new byte[0], new ZkVersion(1)), new Versioned<byte[]>(new byte[0], new ZkVersion(1)), logSegmentsData); - ZKLogMetadataForWriter metadata = + LogMetadataForWriter metadata = processLogMetadatas(uri, logName, logIdentifier, metadatas, false); assertTrue(maxTxnIdData == metadata.getMaxTxIdData()); assertTrue(logSegmentsData == metadata.getMaxLSSNData()); @@ -191,12 +193,12 @@ public class TestZKLogStreamMetadataStoreUtils { new Versioned<byte[]>(null, null), new Versioned<byte[]>(null, null), maxTxnIdData, - new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), + new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), null), new Versioned<byte[]>(new byte[0], new ZkVersion(1)), new Versioned<byte[]>(new byte[0], new ZkVersion(1)), logSegmentsData, allocationData); - ZKLogMetadataForWriter metadata = + LogMetadataForWriter metadata = processLogMetadatas(uri, logName, logIdentifier, metadatas, true); assertTrue(maxTxnIdData == metadata.getMaxTxIdData()); assertTrue(logSegmentsData == metadata.getMaxLSSNData()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/dc4548be/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestLogMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestLogMetadata.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestLogMetadata.java new file mode 100644 index 0000000..315b983 --- /dev/null +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestLogMetadata.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.metadata; + +import com.twitter.distributedlog.DLMTestUtil; +import org.junit.Test; + +import java.net.URI; + +import static com.twitter.distributedlog.metadata.LogMetadata.*; +import static org.junit.Assert.*; + +public class TestLogMetadata { + + @Test(timeout = 60000) + public void testGetPaths() throws Exception { + String rootPath = "/test-get-paths"; + URI uri = DLMTestUtil.createDLMURI(2181, rootPath); + String logName = "test-log"; + String logIdentifier = "<default>"; + String logRootPath = uri.getPath() + "/" + logName + "/" + logIdentifier; + String logSegmentName = "test-segment"; + + LogMetadata logMetadata = new LogMetadata(uri, logName, logIdentifier); + assertEquals("wrong log name", logName, logMetadata.getLogName()); + assertEquals("wrong root path", logRootPath, logMetadata.getLogRootPath()); + assertEquals("wrong log segments path", + logRootPath + LOGSEGMENTS_PATH, + logMetadata.getLogSegmentsPath()); + assertEquals("wrong log segment path", + logRootPath + LOGSEGMENTS_PATH + "/" + logSegmentName, + logMetadata.getLogSegmentPath(logSegmentName)); + assertEquals("wrong lock path", + logRootPath + LOCK_PATH, logMetadata.getLockPath()); + assertEquals("wrong max tx id path", + logRootPath + MAX_TXID_PATH, logMetadata.getMaxTxIdPath()); + assertEquals("wrong allocation path", + logRootPath + ALLOCATION_PATH, logMetadata.getAllocationPath()); + assertEquals("wrong qualified name", + logName + ":" + logIdentifier, logMetadata.getFullyQualifiedName()); + } + + +}