This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new bba1c6f DbLedgerStorage -- Main implementation
bba1c6f is described below
commit bba1c6f5d0d92d647467438876cfa32b699780f6
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Dec 15 09:29:10 2017 -0800
DbLedgerStorage -- Main implementation
`LedgerStorage` implementation that uses the EntryLogger mechanism but
stores the indexes in a RocksDB database.
In addition, there are also a WriteCache and a ReadCache that are used to
completely decouple the read & write paths.
Author: Matteo Merli <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #855 from merlimat/db-ledger-storage-impl
---
bookkeeper-proto/pom.xml | 1 +
.../main/proto/DbLedgerStorageDataFormats.proto | 30 +
.../bookie/storage/ldb/DbLedgerStorage.java | 794 +++++++++++++++++++++
.../bookie/storage/ldb/EntryLocationIndex.java | 232 ++++++
.../bookie/storage/ldb/LedgerMetadataIndex.java | 262 +++++++
.../bookie/storage/ldb/LongPairWrapper.java | 69 ++
.../bookkeeper/bookie/storage/ldb/LongWrapper.java | 67 ++
.../storage/ldb/DbLedgerStorageBookieTest.java | 52 ++
.../bookie/storage/ldb/DbLedgerStorageTest.java | 423 +++++++++++
.../storage/ldb/DbLedgerStorageWriteCacheTest.java | 137 ++++
.../bookie/storage/ldb/EntryLocationIndexTest.java | 111 +++
.../main/resources/bookkeeper/findbugsExclude.xml | 4 +
12 files changed, 2182 insertions(+)
diff --git a/bookkeeper-proto/pom.xml b/bookkeeper-proto/pom.xml
index cc9169d..388bdcb 100644
--- a/bookkeeper-proto/pom.xml
+++ b/bookkeeper-proto/pom.xml
@@ -45,6 +45,7 @@
<!-- exclude generated file //-->
<exclude>**/DataFormats.java</exclude>
<exclude>**/BookkeeperProtocol.java</exclude>
+ <exclude>**/DbLedgerStorageDataFormats.java</exclude>
</excludes>
</configuration>
</plugin>
diff --git a/bookkeeper-proto/src/main/proto/DbLedgerStorageDataFormats.proto
b/bookkeeper-proto/src/main/proto/DbLedgerStorageDataFormats.proto
new file mode 100644
index 0000000..e68b2d0
--- /dev/null
+++ b/bookkeeper-proto/src/main/proto/DbLedgerStorageDataFormats.proto
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+option java_package = "org.apache.bookkeeper.bookie.storage.ldb";
+option optimize_for = SPEED;
+
+/**
+ * Ledger metadata stored in the bookie
+ */
+message LedgerData {
+ required bool exists = 1;
+ required bool fenced = 2;
+ required bytes masterKey = 3;
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
new file mode 100644
index 0000000..fa80016
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -0,0 +1,794 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.io.IOException;
+import java.util.Observable;
+import java.util.Observer;
+import java.util.SortedMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.GarbageCollectorThread;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of LedgerStorage that uses RocksDB to keep the indexes for
+ * entries stored in EntryLogs.
+ */
+public class DbLedgerStorage implements CompactableLedgerStorage {
+
+ private EntryLogger entryLogger;
+
+ private LedgerMetadataIndex ledgerIndex;
+ private EntryLocationIndex entryLocationIndex;
+
+ private GarbageCollectorThread gcThread;
+
+ // Write cache where all new entries are inserted into
+ protected WriteCache writeCache;
+
+ // Write cache that is used to swap with writeCache during flushes
+ protected WriteCache writeCacheBeingFlushed;
+
+ // Cache where we insert entries for speculative reading
+ private ReadCache readCache;
+
+ private final ReentrantReadWriteLock writeCacheMutex = new
ReentrantReadWriteLock();
+ private final Condition flushWriteCacheCondition =
writeCacheMutex.writeLock().newCondition();
+
+ protected final ReentrantLock flushMutex = new ReentrantLock();
+
+ protected final AtomicBoolean hasFlushBeenTriggered = new
AtomicBoolean(false);
+ private final AtomicBoolean isFlushOngoing = new AtomicBoolean(false);
+
+ private final ExecutorService executor =
Executors.newSingleThreadExecutor(new DefaultThreadFactory("db-storage"));
+
+ // Executor used to for db index cleanup
+ private final ExecutorService cleanupExecutor = Executors
+ .newSingleThreadExecutor(new
DefaultThreadFactory("db-storage-cleanup"));
+
+ static final String WRITE_CACHE_MAX_SIZE_MB =
"dbStorage_writeCacheMaxSizeMb";
+ static final String READ_AHEAD_CACHE_BATCH_SIZE =
"dbStorage_readAheadCacheBatchSize";
+ static final String READ_AHEAD_CACHE_MAX_SIZE_MB =
"dbStorage_readAheadCacheMaxSizeMb";
+
+ private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = 16;
+ private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = 16;
+ private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
+
+ private static final int MB = 1024 * 1024;
+
+ private final CopyOnWriteArrayList<LedgerDeletionListener>
ledgerDeletionListeners = Lists
+ .newCopyOnWriteArrayList();
+
+ private long writeCacheMaxSize;
+
+ private CheckpointSource checkpointSource = null;
+ private Checkpoint lastCheckpoint = Checkpoint.MIN;
+
+ private long readCacheMaxSize;
+ private int readAheadCacheBatchSize;
+
+ private StatsLogger stats;
+
+ private OpStatsLogger addEntryStats;
+ private OpStatsLogger readEntryStats;
+ private OpStatsLogger readCacheHitStats;
+ private OpStatsLogger readCacheMissStats;
+ private OpStatsLogger readAheadBatchCountStats;
+ private OpStatsLogger readAheadBatchSizeStats;
+ private OpStatsLogger flushStats;
+ private OpStatsLogger flushSizeStats;
+
+ @Override
+ public void initialize(ServerConfiguration conf, LedgerManager
ledgerManager, LedgerDirsManager ledgerDirsManager,
+ LedgerDirsManager indexDirsManager, CheckpointSource
checkpointSource, Checkpointer checkpointer,
+ StatsLogger statsLogger) throws IOException {
+ checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
+ "Db implementation only allows for one storage dir");
+
+ String baseDir =
ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+
+ writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB,
DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
+
+ writeCache = new WriteCache(writeCacheMaxSize / 2);
+ writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
+
+ this.checkpointSource = checkpointSource;
+
+ readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB,
DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
+ readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE,
DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
+
+ readCache = new ReadCache(readCacheMaxSize);
+
+ this.stats = statsLogger;
+
+ log.info("Started Db Ledger Storage");
+ log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB);
+ log.info(" - Read Cache: {} MB", readCacheMaxSize / MB);
+ log.info(" - Read Ahead Batch size: : {}", readAheadCacheBatchSize);
+
+ ledgerIndex = new LedgerMetadataIndex(conf,
KeyValueStorageRocksDB.factory, baseDir, stats);
+ entryLocationIndex = new EntryLocationIndex(conf,
KeyValueStorageRocksDB.factory, baseDir, stats);
+
+ entryLogger = new EntryLogger(conf, ledgerDirsManager);
+ gcThread = new GarbageCollectorThread(conf, ledgerManager, this);
+
+ registerStats();
+ }
+
+ public void registerStats() {
+ stats.registerGauge("write-cache-size", new Gauge<Long>() {
+ @Override
+ public Long getDefaultValue() {
+ return 0L;
+ }
+
+ @Override
+ public Long getSample() {
+ return writeCache.size() + writeCacheBeingFlushed.size();
+ }
+ });
+ stats.registerGauge("write-cache-count", new Gauge<Long>() {
+ @Override
+ public Long getDefaultValue() {
+ return 0L;
+ }
+
+ @Override
+ public Long getSample() {
+ return writeCache.count() + writeCacheBeingFlushed.count();
+ }
+ });
+ stats.registerGauge("read-cache-size", new Gauge<Long>() {
+ @Override
+ public Long getDefaultValue() {
+ return 0L;
+ }
+
+ @Override
+ public Long getSample() {
+ return readCache.size();
+ }
+ });
+ stats.registerGauge("read-cache-count", new Gauge<Long>() {
+ @Override
+ public Long getDefaultValue() {
+ return 0L;
+ }
+
+ @Override
+ public Long getSample() {
+ return readCache.count();
+ }
+ });
+
+ addEntryStats = stats.getOpStatsLogger("add-entry");
+ readEntryStats = stats.getOpStatsLogger("read-entry");
+ readCacheHitStats = stats.getOpStatsLogger("read-cache-hits");
+ readCacheMissStats = stats.getOpStatsLogger("read-cache-misses");
+ readAheadBatchCountStats =
stats.getOpStatsLogger("readahead-batch-count");
+ readAheadBatchSizeStats =
stats.getOpStatsLogger("readahead-batch-size");
+ flushStats = stats.getOpStatsLogger("flush");
+ flushSizeStats = stats.getOpStatsLogger("flush-size");
+ }
+
+ @Override
+ public void start() {
+ gcThread.start();
+ }
+
+ @Override
+ public void shutdown() throws InterruptedException {
+ try {
+ flush();
+
+ gcThread.shutdown();
+ entryLogger.shutdown();
+
+ cleanupExecutor.shutdown();
+ cleanupExecutor.awaitTermination(1, TimeUnit.SECONDS);
+
+ ledgerIndex.close();
+ entryLocationIndex.close();
+
+ writeCache.close();
+ writeCacheBeingFlushed.close();
+ readCache.close();
+ executor.shutdown();
+
+ } catch (IOException e) {
+ log.error("Error closing db storage", e);
+ }
+ }
+
+ @Override
+ public boolean ledgerExists(long ledgerId) throws IOException {
+ try {
+ LedgerData ledgerData = ledgerIndex.get(ledgerId);
+ if (log.isDebugEnabled()) {
+ log.debug("Ledger exists. ledger: {} : {}", ledgerId,
ledgerData.getExists());
+ }
+ return ledgerData.getExists();
+ } catch (Bookie.NoLedgerException nle) {
+ // ledger does not exist
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isFenced(long ledgerId) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("isFenced. ledger: {}", ledgerId);
+ }
+ return ledgerIndex.get(ledgerId).getFenced();
+ }
+
+ @Override
+ public boolean setFenced(long ledgerId) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("Set fenced. ledger: {}", ledgerId);
+ }
+ return ledgerIndex.setFenced(ledgerId);
+ }
+
+ @Override
+ public void setMasterKey(long ledgerId, byte[] masterKey) throws
IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("Set master key. ledger: {}", ledgerId);
+ }
+ ledgerIndex.setMasterKey(ledgerId, masterKey);
+ }
+
+ @Override
+ public byte[] readMasterKey(long ledgerId) throws IOException,
BookieException {
+ if (log.isDebugEnabled()) {
+ log.debug("Read master key. ledger: {}", ledgerId);
+ }
+ return ledgerIndex.get(ledgerId).getMasterKey().toByteArray();
+ }
+
+ @Override
+ public long addEntry(ByteBuf entry) throws IOException {
+ long startTime = MathUtils.nowInNano();
+
+ long ledgerId = entry.readLong();
+ long entryId = entry.readLong();
+ entry.resetReaderIndex();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Add entry. {}@{}", ledgerId, entryId);
+ }
+
+ // Waits if the write cache is being switched for a flush
+ writeCacheMutex.readLock().lock();
+ boolean inserted;
+ try {
+ inserted = writeCache.put(ledgerId, entryId, entry);
+ } finally {
+ writeCacheMutex.readLock().unlock();
+ }
+
+ if (!inserted) {
+ triggerFlushAndAddEntry(ledgerId, entryId, entry);
+ }
+
+ recordSuccessfulEvent(addEntryStats, startTime);
+ return entryId;
+ }
+
+ private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf
entry) throws IOException {
+ // Write cache is full, we need to trigger a flush so that it gets
rotated
+ writeCacheMutex.writeLock().lock();
+
+ try {
+ // If the flush has already been triggered or flush has already
switched the
+ // cache, we don't need to
+ // trigger another flush
+ if (!isFlushOngoing.get() &&
hasFlushBeenTriggered.compareAndSet(false, true)) {
+ // Trigger an early flush in background
+ log.info("Write cache is full, triggering flush");
+ executor.execute(() -> {
+ try {
+ flush();
+ } catch (IOException e) {
+ log.error("Error during flush", e);
+ }
+ });
+ }
+
+ long timeoutNs = TimeUnit.MILLISECONDS.toNanos(100);
+ while (hasFlushBeenTriggered.get()) {
+ if (timeoutNs <= 0L) {
+ throw new IOException("Write cache was not trigger within
the timeout, cannot add entry " + ledgerId
+ + "@" + entryId);
+ }
+ timeoutNs = flushWriteCacheCondition.awaitNanos(timeoutNs);
+ }
+
+ if (!writeCache.put(ledgerId, entryId, entry)) {
+ // Still wasn't able to cache entry
+ throw new IOException("Error while inserting entry in write
cache" + ledgerId + "@" + entryId);
+ }
+
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted when adding entry " + ledgerId
+ "@" + entryId);
+ } finally {
+ writeCacheMutex.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
+ long startTime = MathUtils.nowInNano();
+ if (log.isDebugEnabled()) {
+ log.debug("Get Entry: {}@{}", ledgerId, entryId);
+ }
+
+ if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+ return getLastEntry(ledgerId);
+ }
+
+ writeCacheMutex.readLock().lock();
+ try {
+ // First try to read from the write cache of recent entries
+ ByteBuf entry = writeCache.get(ledgerId, entryId);
+ if (entry != null) {
+ recordSuccessfulEvent(readCacheHitStats, startTime);
+ recordSuccessfulEvent(readEntryStats, startTime);
+ return entry;
+ }
+
+ // If there's a flush going on, the entry might be in the flush
buffer
+ entry = writeCacheBeingFlushed.get(ledgerId, entryId);
+ if (entry != null) {
+ recordSuccessfulEvent(readCacheHitStats, startTime);
+ recordSuccessfulEvent(readEntryStats, startTime);
+ return entry;
+ }
+ } finally {
+ writeCacheMutex.readLock().unlock();
+ }
+
+ // Try reading from read-ahead cache
+ ByteBuf entry = readCache.get(ledgerId, entryId);
+ if (entry != null) {
+ recordSuccessfulEvent(readCacheHitStats, startTime);
+ recordSuccessfulEvent(readEntryStats, startTime);
+ return entry;
+ }
+
+ // Read from main storage
+ long entryLocation;
+ try {
+ entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
+ if (entryLocation == 0) {
+ throw new NoEntryException(ledgerId, entryId);
+ }
+ entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
+ } catch (NoEntryException e) {
+ recordFailedEvent(readEntryStats, startTime);
+ throw e;
+ }
+
+ readCache.put(ledgerId, entryId, entry);
+
+ // Try to read more entries
+ long nextEntryLocation = entryLocation + 4 /* size header */ +
entry.readableBytes();
+ fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);
+
+ recordSuccessfulEvent(readCacheMissStats, startTime);
+ recordSuccessfulEvent(readEntryStats, startTime);
+ return entry;
+ }
+
+ private void fillReadAheadCache(long orginalLedgerId, long firstEntryId,
long firstEntryLocation) {
+ try {
+ long firstEntryLogId = (firstEntryLocation >> 32);
+ long currentEntryLogId = firstEntryLogId;
+ long currentEntryLocation = firstEntryLocation;
+ int count = 0;
+ long size = 0;
+
+ while (count < readAheadCacheBatchSize && currentEntryLogId ==
firstEntryLogId) {
+ ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId,
-1, currentEntryLocation);
+
+ try {
+ long currentEntryLedgerId = entry.getLong(0);
+ long currentEntryId = entry.getLong(8);
+
+ if (currentEntryLedgerId != orginalLedgerId) {
+ // Found an entry belonging to a different ledger,
stopping read-ahead
+ entry.release();
+ return;
+ }
+
+ // Insert entry in read cache
+ readCache.put(orginalLedgerId, currentEntryId, entry);
+
+ count++;
+ size += entry.readableBytes();
+
+ currentEntryLocation += 4 + entry.readableBytes();
+ currentEntryLogId = currentEntryLocation >> 32;
+ } finally {
+ entry.release();
+ }
+ }
+
+ readAheadBatchCountStats.registerSuccessfulValue(count);
+ readAheadBatchSizeStats.registerSuccessfulValue(size);
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Exception during read ahead for ledger: {}: e",
orginalLedgerId, e);
+ }
+ }
+ }
+
+ public ByteBuf getLastEntry(long ledgerId) throws IOException {
+ long startTime = MathUtils.nowInNano();
+
+ writeCacheMutex.readLock().lock();
+ try {
+ // First try to read from the write cache of recent entries
+ ByteBuf entry = writeCache.getLastEntry(ledgerId);
+ if (entry != null) {
+ if (log.isDebugEnabled()) {
+ long foundLedgerId = entry.readLong(); // ledgedId
+ long entryId = entry.readLong();
+ entry.resetReaderIndex();
+ if (log.isDebugEnabled()) {
+ log.debug("Found last entry for ledger {} in write
cache: {}@{}", ledgerId, foundLedgerId,
+ entryId);
+ }
+ }
+
+ recordSuccessfulEvent(readCacheHitStats, startTime);
+ recordSuccessfulEvent(readEntryStats, startTime);
+ return entry;
+ }
+
+ // If there's a flush going on, the entry might be in the flush
buffer
+ entry = writeCacheBeingFlushed.getLastEntry(ledgerId);
+ if (entry != null) {
+ if (log.isDebugEnabled()) {
+ entry.readLong(); // ledgedId
+ long entryId = entry.readLong();
+ entry.resetReaderIndex();
+ if (log.isDebugEnabled()) {
+ log.debug("Found last entry for ledger {} in write
cache being flushed: {}", ledgerId, entryId);
+ }
+ }
+
+ recordSuccessfulEvent(readCacheHitStats, startTime);
+ recordSuccessfulEvent(readEntryStats, startTime);
+ return entry;
+ }
+ } finally {
+ writeCacheMutex.readLock().unlock();
+ }
+
+ // Search the last entry in storage
+ long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId);
+ if (log.isDebugEnabled()) {
+ log.debug("Found last entry for ledger {} in db: {}", ledgerId,
lastEntryId);
+ }
+
+ long entryLocation = entryLocationIndex.getLocation(ledgerId,
lastEntryId);
+ ByteBuf content = entryLogger.readEntry(ledgerId, lastEntryId,
entryLocation);
+
+ recordSuccessfulEvent(readCacheMissStats, startTime);
+ recordSuccessfulEvent(readEntryStats, startTime);
+ return content;
+ }
+
+ @VisibleForTesting
+ boolean isFlushRequired() {
+ writeCacheMutex.readLock().lock();
+ try {
+ return !writeCache.isEmpty();
+ } finally {
+ writeCacheMutex.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void checkpoint(Checkpoint checkpoint) throws IOException {
+ Checkpoint thisCheckpoint = checkpointSource.newCheckpoint();
+ if (lastCheckpoint.compareTo(checkpoint) > 0) {
+ return;
+ }
+
+ long startTime = MathUtils.nowInNano();
+
+ // Only a single flush operation can happen at a time
+ flushMutex.lock();
+
+ try {
+ // Swap the write cache so that writes can continue to happen
while the flush is
+ // ongoing
+ swapWriteCache();
+
+ long sizeToFlush = writeCacheBeingFlushed.size();
+ if (log.isDebugEnabled()) {
+ log.debug("Flushing entries. count: {} -- size {} Mb",
writeCacheBeingFlushed.count(),
+ sizeToFlush / 1024.0 / 1024);
+ }
+
+ // Write all the pending entries into the entry logger and collect
the offset
+ // position for each entry
+
+ Batch batch = entryLocationIndex.newBatch();
+ writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
+ try {
+ long location = entryLogger.addEntry(ledgerId, entry,
true);
+ entryLocationIndex.addLocation(batch, ledgerId, entryId,
location);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ entryLogger.flush();
+
+ long batchFlushStarTime = System.nanoTime();
+ batch.flush();
+ batch.close();
+ if (log.isDebugEnabled()) {
+ log.debug("DB batch flushed time : {} s",
+ MathUtils.elapsedNanos(batchFlushStarTime) / (double)
TimeUnit.SECONDS.toNanos(1));
+ }
+
+ ledgerIndex.flush();
+
+ cleanupExecutor.execute(() -> {
+ // There can only be one single cleanup task running because
the cleanupExecutor
+ // is single-threaded
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Removing deleted ledgers from db indexes");
+ }
+
+ entryLocationIndex.removeOffsetFromDeletedLedgers();
+ ledgerIndex.removeDeletedLedgers();
+ } catch (Throwable t) {
+ log.warn("Failed to cleanup db indexes", t);
+ }
+ });
+
+ lastCheckpoint = thisCheckpoint;
+
+ // Discard all the entry from the write cache, since they're now
persisted
+ writeCacheBeingFlushed.clear();
+
+ double flushTimeSeconds = MathUtils.elapsedNanos(startTime) /
(double) TimeUnit.SECONDS.toNanos(1);
+ double flushThroughput = sizeToFlush / 1024.0 / 1024.0 /
flushTimeSeconds;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Flushing done time {} s -- Written {} MB/s",
flushTimeSeconds, flushThroughput);
+ }
+
+ recordSuccessfulEvent(flushStats, startTime);
+ flushSizeStats.registerSuccessfulValue(sizeToFlush);
+ } catch (IOException e) {
+ // Leave IOExecption as it is
+ throw e;
+ } catch (RuntimeException e) {
+ // Wrap unchecked exceptions
+ throw new IOException(e);
+ } finally {
+ try {
+ isFlushOngoing.set(false);
+ } finally {
+ flushMutex.unlock();
+ }
+ }
+ }
+
+ /**
+ * Swap the current write cache with the replacement cache.
+ */
+ private void swapWriteCache() {
+ writeCacheMutex.writeLock().lock();
+ try {
+ // First, swap the current write-cache map with an empty one so
that writes will
+ // go on unaffected. Only a single flush is happening at the same
time
+ WriteCache tmp = writeCacheBeingFlushed;
+ writeCacheBeingFlushed = writeCache;
+ writeCache = tmp;
+
+ // since the cache is switched, we can allow flush to be triggered
+ hasFlushBeenTriggered.set(false);
+ flushWriteCacheCondition.signalAll();
+ } finally {
+ try {
+ isFlushOngoing.set(true);
+ } finally {
+ writeCacheMutex.writeLock().unlock();
+ }
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ checkpoint(Checkpoint.MAX);
+ }
+
+ @Override
+ public void deleteLedger(long ledgerId) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("Deleting ledger {}", ledgerId);
+ }
+
+ // Delete entries from this ledger that are still in the write cache
+ writeCacheMutex.readLock().lock();
+ try {
+ writeCache.deleteLedger(ledgerId);
+ } finally {
+ writeCacheMutex.readLock().unlock();
+ }
+
+ entryLocationIndex.delete(ledgerId);
+ ledgerIndex.delete(ledgerId);
+
+ for (int i = 0, size = ledgerDeletionListeners.size(); i < size; i++) {
+ LedgerDeletionListener listener = ledgerDeletionListeners.get(i);
+ listener.ledgerDeleted(ledgerId);
+ }
+ }
+
+ @Override
+ public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long
lastLedgerId) throws IOException {
+ return ledgerIndex.getActiveLedgersInRange(firstLedgerId,
lastLedgerId);
+ }
+
+ @Override
+ public void updateEntriesLocations(Iterable<EntryLocation> locations)
throws IOException {
+ // Trigger a flush to have all the entries being compacted in the db
storage
+ flush();
+
+ entryLocationIndex.updateLocations(locations);
+ }
+
+ @Override
+ public EntryLogger getEntryLogger() {
+ return entryLogger;
+ }
+
+ @Override
+ public long getLastAddConfirmed(long ledgerId) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Observable waitForLastAddConfirmedUpdate(long ledgerId, long
previoisLAC, Observer observer)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuf getExplicitLac(long ledgerId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void flushEntriesLocationsIndex() throws IOException {
+ // No-op. Location index is already flushed in
updateEntriesLocations() call
+ }
+
+ /**
+ * Add an already existing ledger to the index.
+ *
+ * <p>This method is only used as a tool to help the migration from
+ * InterleaveLedgerStorage to DbLedgerStorage
+ *
+ * @param ledgerId
+ * the ledger id
+ * @param entries
+ * a map of entryId -> location
+ * @return the number of
+ */
+ public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[]
masterKey,
+ Iterable<SortedMap<Long, Long>> entries) throws Exception {
+ LedgerData ledgerData =
LedgerData.newBuilder().setExists(true).setFenced(isFenced)
+ .setMasterKey(ByteString.copyFrom(masterKey)).build();
+ ledgerIndex.set(ledgerId, ledgerData);
+ AtomicLong numberOfEntries = new AtomicLong();
+
+ // Iterate over all the entries pages
+ Batch batch = entryLocationIndex.newBatch();
+ entries.forEach(map -> {
+ map.forEach((entryId, location) -> {
+ try {
+ entryLocationIndex.addLocation(batch, ledgerId, entryId,
location);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ numberOfEntries.incrementAndGet();
+ });
+ });
+
+ batch.flush();
+ batch.close();
+
+ return numberOfEntries.get();
+ }
+
+ @Override
+ public void registerLedgerDeletionListener(LedgerDeletionListener
listener) {
+ ledgerDeletionListeners.add(listener);
+ }
+
+ public EntryLocationIndex getEntryLocationIndex() {
+ return entryLocationIndex;
+ }
+
+ private void recordSuccessfulEvent(OpStatsLogger logger, long
startTimeNanos) {
+ logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
+ }
+
+ private void recordFailedEvent(OpStatsLogger logger, long startTimeNanos) {
+ logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
+ }
+
+ private static final Logger log =
LoggerFactory.getLogger(DbLedgerStorage.class);
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
new file mode 100644
index 0000000..53e37a2
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
@@ -0,0 +1,232 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Iterables;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains an index of the entry locations in the EntryLogger.
+ *
+ * <p>For each ledger multiple entries are stored in the same "record",
represented
+ * by the {@link LedgerIndexPage} class.
+ */
+public class EntryLocationIndex implements Closeable {
+
+ private final KeyValueStorage locationsDb;
+ private final ConcurrentLongHashSet deletedLedgers = new
ConcurrentLongHashSet();
+
+ private StatsLogger stats;
+
+ public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory
storageFactory, String basePath,
+ StatsLogger stats) throws IOException {
+ String locationsDbPath = FileSystems.getDefault().getPath(basePath,
"locations").toFile().toString();
+ locationsDb = storageFactory.newKeyValueStorage(locationsDbPath,
DbConfigType.Huge, conf);
+
+ this.stats = stats;
+ registerStats();
+ }
+
+ public void registerStats() {
+ stats.registerGauge("entries-count", new Gauge<Long>() {
+ @Override
+ public Long getDefaultValue() {
+ return 0L;
+ }
+
+ @Override
+ public Long getSample() {
+ try {
+ return locationsDb.count();
+ } catch (IOException e) {
+ return -1L;
+ }
+ }
+ });
+ }
+
+ @Override
+ public void close() throws IOException {
+ locationsDb.close();
+ }
+
+ public long getLocation(long ledgerId, long entryId) throws IOException {
+ LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId);
+ LongWrapper value = LongWrapper.get();
+
+ try {
+ if (locationsDb.get(key.array, value.array) < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Entry not found {}@{} in db index", ledgerId,
entryId);
+ }
+ return 0;
+ }
+
+ return value.getValue();
+ } finally {
+ key.recycle();
+ value.recycle();
+ }
+ }
+
+ public long getLastEntryInLedger(long ledgerId) throws IOException {
+ if (deletedLedgers.contains(ledgerId)) {
+ // Ledger already deleted
+ return -1;
+ }
+
+ return getLastEntryInLedgerInternal(ledgerId);
+ }
+
+ private long getLastEntryInLedgerInternal(long ledgerId) throws
IOException {
+ LongPairWrapper maxEntryId = LongPairWrapper.get(ledgerId,
Long.MAX_VALUE);
+
+ // Search the last entry in storage
+ Entry<byte[], byte[]> entry = locationsDb.getFloor(maxEntryId.array);
+ maxEntryId.recycle();
+
+ if (entry == null) {
+ throw new Bookie.NoEntryException(ledgerId, -1);
+ } else {
+ long foundLedgerId = ArrayUtil.getLong(entry.getKey(), 0);
+ long lastEntryId = ArrayUtil.getLong(entry.getKey(), 8);
+
+ if (foundLedgerId == ledgerId) {
+ if (log.isDebugEnabled()) {
+ log.debug("Found last page in storage db for ledger {} -
last entry: {}", ledgerId, lastEntryId);
+ }
+ return lastEntryId;
+ } else {
+ throw new Bookie.NoEntryException(ledgerId, -1);
+ }
+ }
+ }
+
+ public void addLocation(long ledgerId, long entryId, long location) throws
IOException {
+ Batch batch = locationsDb.newBatch();
+ addLocation(batch, ledgerId, entryId, location);
+ batch.flush();
+ batch.close();
+ }
+
+ public Batch newBatch() {
+ return locationsDb.newBatch();
+ }
+
+ public void addLocation(Batch batch, long ledgerId, long entryId, long
location) throws IOException {
+ LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId);
+ LongWrapper value = LongWrapper.get(location);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Add location - ledger: {} -- entry: {} -- location:
{}", ledgerId, entryId, location);
+ }
+
+ try {
+ batch.put(key.array, value.array);
+ } finally {
+ key.recycle();
+ value.recycle();
+ }
+ }
+
+ public void updateLocations(Iterable<EntryLocation> newLocations) throws
IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("Update locations -- {}", Iterables.size(newLocations));
+ }
+
+ Batch batch = newBatch();
+ // Update all the ledger index pages with the new locations
+ for (EntryLocation e : newLocations) {
+ if (log.isDebugEnabled()) {
+ log.debug("Update location - ledger: {} -- entry: {}",
e.ledger, e.entry);
+ }
+
+ addLocation(batch, e.ledger, e.entry, e.location);
+ }
+
+ batch.flush();
+ batch.close();
+ }
+
+ public void delete(long ledgerId) throws IOException {
+ // We need to find all the LedgerIndexPage records belonging to one
specific
+ // ledgers
+ deletedLedgers.add(ledgerId);
+ }
+
+ public void removeOffsetFromDeletedLedgers() throws IOException {
+ LongPairWrapper firstKeyWrapper = LongPairWrapper.get(-1, -1);
+ LongPairWrapper lastKeyWrapper = LongPairWrapper.get(-1, -1);
+ LongPairWrapper keyToDelete = LongPairWrapper.get(-1, -1);
+
+ Set<Long> ledgersToDelete = deletedLedgers.items();
+
+ if (ledgersToDelete.isEmpty()) {
+ return;
+ }
+
+ log.info("Deleting indexes for ledgers: {}", ledgersToDelete);
+ Batch batch = locationsDb.newBatch();
+
+ try {
+ for (long ledgerId : ledgersToDelete) {
+ if (log.isDebugEnabled()) {
+ log.debug("Deleting indexes from ledger {}", ledgerId);
+ }
+
+ firstKeyWrapper.set(ledgerId, 0);
+ lastKeyWrapper.set(ledgerId, Long.MAX_VALUE);
+
+ batch.deleteRange(firstKeyWrapper.array, lastKeyWrapper.array);
+ }
+
+ batch.flush();
+
+ // Removed from pending set
+ for (long ledgerId : ledgersToDelete) {
+ deletedLedgers.remove(ledgerId);
+ }
+ } finally {
+ firstKeyWrapper.recycle();
+ lastKeyWrapper.recycle();
+ keyToDelete.recycle();
+ batch.close();
+ }
+ }
+
+ private static final Logger log =
LoggerFactory.getLogger(EntryLocationIndex.class);
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
new file mode 100644
index 0000000..04bf32d
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
@@ -0,0 +1,262 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.protobuf.ByteString;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Arrays;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
+import
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.CloseableIterator;
+import
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains an index for the ledgers metadata.
+ *
+ * <p>The key is the ledgerId and the value is the {@link LedgerData} content.
+ */
+public class LedgerMetadataIndex implements Closeable {
+ // Contains all ledgers stored in the bookie
+ private final ConcurrentLongHashMap<LedgerData> ledgers;
+ private final AtomicInteger ledgersCount;
+
+ private final KeyValueStorage ledgersDb;
+ private StatsLogger stats;
+
+ // Holds ledger modifications applied in memory map, and pending to be
flushed on db
+ private final ConcurrentLinkedQueue<Entry<Long, LedgerData>>
pendingLedgersUpdates;
+
+ // Holds ledger ids that were delete from memory map, and pending to be
flushed on db
+ private final ConcurrentLinkedQueue<Long> pendingDeletedLedgers;
+
+ public LedgerMetadataIndex(ServerConfiguration conf,
KeyValueStorageFactory storageFactory, String basePath,
+ StatsLogger stats) throws IOException {
+ String ledgersPath = FileSystems.getDefault().getPath(basePath,
"ledgers").toFile().toString();
+ ledgersDb = storageFactory.newKeyValueStorage(ledgersPath,
DbConfigType.Small, conf);
+
+ ledgers = new ConcurrentLongHashMap<>();
+ ledgersCount = new AtomicInteger();
+
+ // Read all ledgers from db
+ CloseableIterator<Entry<byte[], byte[]>> iterator =
ledgersDb.iterator();
+ try {
+ while (iterator.hasNext()) {
+ Entry<byte[], byte[]> entry = iterator.next();
+ long ledgerId = ArrayUtil.getLong(entry.getKey(), 0);
+ LedgerData ledgerData = LedgerData.parseFrom(entry.getValue());
+ ledgers.put(ledgerId, ledgerData);
+ ledgersCount.incrementAndGet();
+ }
+ } finally {
+ iterator.close();
+ }
+
+ this.pendingLedgersUpdates = new ConcurrentLinkedQueue<Entry<Long,
LedgerData>>();
+ this.pendingDeletedLedgers = new ConcurrentLinkedQueue<Long>();
+
+ this.stats = stats;
+ registerStats();
+ }
+
+ public void registerStats() {
+ stats.registerGauge("ledgers-count", new Gauge<Long>() {
+ @Override
+ public Long getDefaultValue() {
+ return 0L;
+ }
+
+ @Override
+ public Long getSample() {
+ return (long) ledgersCount.get();
+ }
+ });
+ }
+
+ @Override
+ public void close() throws IOException {
+ ledgersDb.close();
+ }
+
+ public LedgerData get(long ledgerId) throws IOException {
+ LedgerData ledgerData = ledgers.get(ledgerId);
+ if (ledgerData == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Ledger not found {}", ledgerId);
+ }
+ throw new Bookie.NoLedgerException(ledgerId);
+ }
+
+ return ledgerData;
+ }
+
+ public void set(long ledgerId, LedgerData ledgerData) throws IOException {
+ ledgerData = LedgerData.newBuilder(ledgerData).setExists(true).build();
+
+ if (ledgers.put(ledgerId, ledgerData) == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Added new ledger {}", ledgerId);
+ }
+ ledgersCount.incrementAndGet();
+ }
+
+ pendingLedgersUpdates.add(new SimpleEntry<Long, LedgerData>(ledgerId,
ledgerData));
+ pendingDeletedLedgers.remove(ledgerId);
+ }
+
+ public void delete(long ledgerId) throws IOException {
+ if (ledgers.remove(ledgerId) != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Removed ledger {}", ledgerId);
+ }
+ ledgersCount.decrementAndGet();
+ }
+
+ pendingDeletedLedgers.add(ledgerId);
+ pendingLedgersUpdates.removeIf(e -> e.getKey() == ledgerId);
+ }
+
+ public Iterable<Long> getActiveLedgersInRange(final long firstLedgerId,
final long lastLedgerId)
+ throws IOException {
+ return Iterables.filter(ledgers.keys(), new Predicate<Long>() {
+ @Override
+ public boolean apply(Long ledgerId) {
+ return ledgerId >= firstLedgerId && ledgerId < lastLedgerId;
+ }
+ });
+ }
+
+ public boolean setFenced(long ledgerId) throws IOException {
+ LedgerData ledgerData = get(ledgerId);
+ if (ledgerData.getFenced()) {
+ return false;
+ }
+
+ LedgerData newLedgerData =
LedgerData.newBuilder(ledgerData).setFenced(true).build();
+
+ if (ledgers.put(ledgerId, newLedgerData) == null) {
+ // Ledger had been deleted
+ if (log.isDebugEnabled()) {
+ log.debug("Re-inserted fenced ledger {}", ledgerId);
+ }
+ ledgersCount.incrementAndGet();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Set fenced ledger {}", ledgerId);
+ }
+ }
+
+ pendingLedgersUpdates.add(new SimpleEntry<Long, LedgerData>(ledgerId,
newLedgerData));
+ pendingDeletedLedgers.remove(ledgerId);
+ return true;
+ }
+
+ public void setMasterKey(long ledgerId, byte[] masterKey) throws
IOException {
+ LedgerData ledgerData = ledgers.get(ledgerId);
+ if (ledgerData == null) {
+ // New ledger inserted
+ ledgerData =
LedgerData.newBuilder().setExists(true).setFenced(false)
+ .setMasterKey(ByteString.copyFrom(masterKey)).build();
+ if (log.isDebugEnabled()) {
+ log.debug("Inserting new ledger {}", ledgerId);
+ }
+ } else {
+ byte[] storedMasterKey = ledgerData.getMasterKey().toByteArray();
+ if (ArrayUtil.isArrayAllZeros(storedMasterKey)) {
+ // update master key of the ledger
+ ledgerData =
LedgerData.newBuilder(ledgerData).setMasterKey(ByteString.copyFrom(masterKey)).build();
+ if (log.isDebugEnabled()) {
+ log.debug("Replace old master key {} with new master key
{}", storedMasterKey, masterKey);
+ }
+ } else if (!Arrays.equals(storedMasterKey, masterKey) &&
!ArrayUtil.isArrayAllZeros(masterKey)) {
+ log.warn("Ledger {} masterKey in db can only be set once.",
ledgerId);
+ throw new
IOException(BookieException.create(BookieException.Code.IllegalOpException));
+ }
+ }
+
+ if (ledgers.put(ledgerId, ledgerData) == null) {
+ ledgersCount.incrementAndGet();
+ }
+
+ pendingLedgersUpdates.add(new SimpleEntry<Long, LedgerData>(ledgerId,
ledgerData));
+ pendingDeletedLedgers.remove(ledgerId);
+ }
+
+ /**
+ * Flushes all pending changes.
+ */
+ public void flush() throws IOException {
+ LongWrapper key = LongWrapper.get();
+
+ int updatedLedgers = 0;
+ while (!pendingLedgersUpdates.isEmpty()) {
+ Entry<Long, LedgerData> entry = pendingLedgersUpdates.poll();
+ key.set(entry.getKey());
+ byte[] value = entry.getValue().toByteArray();
+ ledgersDb.put(key.array, value);
+ ++updatedLedgers;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Persisting updates to {} ledgers", updatedLedgers);
+ }
+
+ ledgersDb.sync();
+ key.recycle();
+ }
+
+ public void removeDeletedLedgers() throws IOException {
+ LongWrapper key = LongWrapper.get();
+
+ int deletedLedgers = 0;
+ while (!pendingDeletedLedgers.isEmpty()) {
+ long ledgerId = pendingDeletedLedgers.poll();
+ key.set(ledgerId);
+ ledgersDb.delete(key.array);
+ deletedLedgers++;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Persisting deletes of ledgers {}", deletedLedgers);
+ }
+
+ ledgersDb.sync();
+ key.recycle();
+ }
+
+ private static final Logger log =
LoggerFactory.getLogger(LedgerMetadataIndex.class);
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongPairWrapper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongPairWrapper.java
new file mode 100644
index 0000000..993297c
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongPairWrapper.java
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
+/**
+ * Recyclable wrapper that holds a pair of longs.
+ */
+class LongPairWrapper {
+
+ final byte[] array = new byte[16];
+
+ public void set(long first, long second) {
+ ArrayUtil.setLong(array, 0, first);
+ ArrayUtil.setLong(array, 8, second);
+ }
+
+ public long getFirst() {
+ return ArrayUtil.getLong(array, 0);
+ }
+
+ public long getSecond() {
+ return ArrayUtil.getLong(array, 8);
+ }
+
+ public static LongPairWrapper get(long first, long second) {
+ LongPairWrapper lp = RECYCLER.get();
+ ArrayUtil.setLong(lp.array, 0, first);
+ ArrayUtil.setLong(lp.array, 8, second);
+ return lp;
+ }
+
+ public void recycle() {
+ handle.recycle(this);
+ }
+
+ private static final Recycler<LongPairWrapper> RECYCLER = new
Recycler<LongPairWrapper>() {
+ @Override
+ protected LongPairWrapper newObject(Handle<LongPairWrapper> handle) {
+ return new LongPairWrapper(handle);
+ }
+ };
+
+ private final Handle<LongPairWrapper> handle;
+
+ private LongPairWrapper(Handle<LongPairWrapper> handle) {
+ this.handle = handle;
+ }
+}
\ No newline at end of file
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongWrapper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongWrapper.java
new file mode 100644
index 0000000..144b9ee
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongWrapper.java
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
+/**
+ * Wrapper for a long serialized into a byte array.
+ */
+class LongWrapper {
+
+ final byte[] array = new byte[8];
+
+ public void set(long value) {
+ ArrayUtil.setLong(array, 0, value);
+ }
+
+ public long getValue() {
+ return ArrayUtil.getLong(array, 0);
+ }
+
+ public static LongWrapper get() {
+ return RECYCLER.get();
+ }
+
+ public static LongWrapper get(long value) {
+ LongWrapper lp = RECYCLER.get();
+ ArrayUtil.setLong(lp.array, 0, value);
+ return lp;
+ }
+
+ public void recycle() {
+ handle.recycle(this);
+ }
+
+ private static final Recycler<LongWrapper> RECYCLER = new
Recycler<LongWrapper>() {
+ @Override
+ protected LongWrapper newObject(Handle<LongWrapper> handle) {
+ return new LongWrapper(handle);
+ }
+ };
+
+ private final Handle<LongWrapper> handle;
+
+ private LongWrapper(Handle<LongWrapper> handle) {
+ this.handle = handle;
+ }
+}
\ No newline at end of file
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java
new file mode 100644
index 0000000..da7f32a
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link DbLedgerStorageBookieTest}.
+ */
+public class DbLedgerStorageBookieTest extends BookKeeperClusterTestCase {
+
+ public DbLedgerStorageBookieTest() {
+ super(1);
+ baseConf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ baseConf.setFlushInterval(60000);
+ baseConf.setGcWaitTime(60000);
+ }
+
+ @Test
+ public void testRecoveryEmptyLedger() throws Exception {
+ LedgerHandle lh1 = bkc.createLedger(1, 1, DigestType.MAC, new byte[0]);
+
+ // Force ledger close & recovery
+ LedgerHandle lh2 = bkc.openLedger(lh1.getId(), DigestType.MAC, new
byte[0]);
+
+ assertEquals(0, lh2.getLength());
+ assertEquals(-1, lh2.getLastAddConfirmed());
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
new file mode 100644
index 0000000..1e91c2c
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -0,0 +1,423 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link DbLedgerStorage}.
+ */
+public class DbLedgerStorageTest {
+
+ private DbLedgerStorage storage;
+ private File tmpDir;
+
+ @Before
+ public void setup() throws Exception {
+ tmpDir = File.createTempFile("bkTest", ".dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File curDir = Bookie.getCurrentDirectory(tmpDir);
+ Bookie.checkDirectoryStructure(curDir);
+
+ int gcWaitTime = 1000;
+ ServerConfiguration conf = new ServerConfiguration();
+ conf.setGcWaitTime(gcWaitTime);
+ conf.setAllowLoopback(true);
+ conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+ Bookie bookie = new Bookie(conf);
+
+ storage = (DbLedgerStorage) bookie.getLedgerStorage();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ storage.shutdown();
+ tmpDir.delete();
+ }
+
+ @Test
+ public void simple() throws Exception {
+ assertEquals(false, storage.ledgerExists(3));
+ try {
+ storage.isFenced(3);
+ fail("should have failed");
+ } catch (Bookie.NoLedgerException nle) {
+ // OK
+ }
+ assertEquals(false, storage.ledgerExists(3));
+ try {
+ storage.setFenced(3);
+ fail("should have failed");
+ } catch (Bookie.NoLedgerException nle) {
+ // OK
+ }
+ storage.setMasterKey(3, "key".getBytes());
+ try {
+ storage.setMasterKey(3, "other-key".getBytes());
+ fail("should have failed");
+ } catch (IOException ioe) {
+ assertTrue(ioe.getCause() instanceof
BookieException.BookieIllegalOpException);
+ }
+ // setting the same key is NOOP
+ storage.setMasterKey(3, "key".getBytes());
+ assertEquals(true, storage.ledgerExists(3));
+ assertEquals(true, storage.setFenced(3));
+ assertEquals(true, storage.isFenced(3));
+ assertEquals(false, storage.setFenced(3));
+
+ storage.setMasterKey(4, "key".getBytes());
+ assertEquals(false, storage.isFenced(4));
+ assertEquals(true, storage.ledgerExists(4));
+
+ assertEquals("key", new String(storage.readMasterKey(4)));
+
+ assertEquals(Lists.newArrayList(4L, 3L),
Lists.newArrayList(storage.getActiveLedgersInRange(0, 100)));
+ assertEquals(Lists.newArrayList(4L, 3L),
Lists.newArrayList(storage.getActiveLedgersInRange(3, 100)));
+ assertEquals(Lists.newArrayList(3L),
Lists.newArrayList(storage.getActiveLedgersInRange(0, 4)));
+
+ // Add / read entries
+ ByteBuf entry = Unpooled.buffer(1024);
+ entry.writeLong(4); // ledger id
+ entry.writeLong(1); // entry id
+ entry.writeBytes("entry-1".getBytes());
+
+ assertEquals(false, ((DbLedgerStorage) storage).isFlushRequired());
+
+ assertEquals(1, storage.addEntry(entry));
+
+ assertEquals(true, ((DbLedgerStorage) storage).isFlushRequired());
+
+ // Read from write cache
+ ByteBuf res = storage.getEntry(4, 1);
+ assertEquals(entry, res);
+
+ storage.flush();
+
+ assertEquals(false, ((DbLedgerStorage) storage).isFlushRequired());
+
+ // Read from db
+ res = storage.getEntry(4, 1);
+ assertEquals(entry, res);
+
+ try {
+ storage.getEntry(4, 2);
+ fail("Should have thrown exception");
+ } catch (NoEntryException e) {
+ // ok
+ }
+
+ ByteBuf entry2 = Unpooled.buffer(1024);
+ entry2.writeLong(4); // ledger id
+ entry2.writeLong(2); // entry id
+ entry2.writeBytes("entry-2".getBytes());
+
+ storage.addEntry(entry2);
+
+ // Read last entry in ledger
+ res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED);
+ assertEquals(entry2, res);
+
+ ByteBuf entry3 = Unpooled.buffer(1024);
+ entry3.writeLong(4); // ledger id
+ entry3.writeLong(3); // entry id
+ entry3.writeBytes("entry-3".getBytes());
+ storage.addEntry(entry3);
+
+ ByteBuf entry4 = Unpooled.buffer(1024);
+ entry4.writeLong(4); // ledger id
+ entry4.writeLong(4); // entry id
+ entry4.writeBytes("entry-4".getBytes());
+ storage.addEntry(entry4);
+
+ res = storage.getEntry(4, 4);
+ assertEquals(entry4, res);
+
+ // Delete
+ assertEquals(true, storage.ledgerExists(4));
+ storage.deleteLedger(4);
+ assertEquals(false, storage.ledgerExists(4));
+
+ // Should not throw exception event if the ledger was deleted
+ storage.getEntry(4, 4);
+
+ storage.addEntry(Unpooled.wrappedBuffer(entry2));
+ res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED);
+ assertEquals(entry4, res);
+
+ // Get last entry from storage
+ storage.flush();
+
+ try {
+ storage.getEntry(4, 4);
+ fail("Should have thrown exception since the ledger was deleted");
+ } catch (NoEntryException e) {
+ // ok
+ }
+ }
+
+ @Test
+ public void testBookieCompaction() throws Exception {
+ storage.setMasterKey(4, "key".getBytes());
+
+ ByteBuf entry3 = Unpooled.buffer(1024);
+ entry3.writeLong(4); // ledger id
+ entry3.writeLong(3); // entry id
+ entry3.writeBytes("entry-3".getBytes());
+ storage.addEntry(entry3);
+
+ // Simulate bookie compaction
+ EntryLogger entryLogger = ((DbLedgerStorage) storage).getEntryLogger();
+ // Rewrite entry-3
+ ByteBuf newEntry3 = Unpooled.buffer(1024);
+ newEntry3.writeLong(4); // ledger id
+ newEntry3.writeLong(3); // entry id
+ newEntry3.writeBytes("new-entry-3".getBytes());
+ long location = entryLogger.addEntry(4, newEntry3, false);
+
+ List<EntryLocation> locations = Lists.newArrayList(new
EntryLocation(4, 3, location));
+ storage.updateEntriesLocations(locations);
+
+ ByteBuf res = storage.getEntry(4, 3);
+ System.out.println("res: " + ByteBufUtil.hexDump(res));
+ System.out.println("newEntry3: " + ByteBufUtil.hexDump(newEntry3));
+ assertEquals(newEntry3, res);
+ }
+
+ @Test
+ public void doubleDirectoryError() throws Exception {
+ int gcWaitTime = 1000;
+ ServerConfiguration conf = new ServerConfiguration();
+ conf.setGcWaitTime(gcWaitTime);
+ conf.setAllowLoopback(true);
+ conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ conf.setLedgerDirNames(new String[] { "dir1", "dir2" });
+
+ try {
+ new Bookie(conf);
+ fail("Should have failed because of the 2 directories");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ }
+
+ @Test
+ public void testRewritingEntries() throws Exception {
+ storage.setMasterKey(1, "key".getBytes());
+
+ try {
+ storage.getEntry(1, -1);
+ fail("Should throw exception");
+ } catch (Bookie.NoEntryException e) {
+ // ok
+ }
+
+ ByteBuf entry1 = Unpooled.buffer(1024);
+ entry1.writeLong(1); // ledger id
+ entry1.writeLong(1); // entry id
+ entry1.writeBytes("entry-1".getBytes());
+
+ storage.addEntry(entry1);
+ storage.flush();
+
+ ByteBuf newEntry1 = Unpooled.buffer(1024);
+ newEntry1.writeLong(1); // ledger id
+ newEntry1.writeLong(1); // entry id
+ newEntry1.writeBytes("new-entry-1".getBytes());
+
+ storage.addEntry(newEntry1);
+ storage.flush();
+
+ ByteBuf response = storage.getEntry(1, 1);
+ assertEquals(newEntry1, response);
+ }
+
+ @Test
+ public void testEntriesOutOfOrder() throws Exception {
+ storage.setMasterKey(1, "key".getBytes());
+
+ ByteBuf entry2 = Unpooled.buffer(1024);
+ entry2.writeLong(1); // ledger id
+ entry2.writeLong(2); // entry id
+ entry2.writeBytes("entry-2".getBytes());
+
+ storage.addEntry(entry2);
+
+ try {
+ storage.getEntry(1, 1);
+ fail("Entry doesn't exist");
+ } catch (NoEntryException e) {
+ // Ok, entry doesn't exist
+ }
+
+ ByteBuf res = storage.getEntry(1, 2);
+ assertEquals(entry2, res);
+
+ ByteBuf entry1 = Unpooled.buffer(1024);
+ entry1.writeLong(1); // ledger id
+ entry1.writeLong(1); // entry id
+ entry1.writeBytes("entry-1".getBytes());
+
+ storage.addEntry(entry1);
+
+ res = storage.getEntry(1, 1);
+ assertEquals(entry1, res);
+
+ res = storage.getEntry(1, 2);
+ assertEquals(entry2, res);
+
+ storage.flush();
+
+ res = storage.getEntry(1, 1);
+ assertEquals(entry1, res);
+
+ res = storage.getEntry(1, 2);
+ assertEquals(entry2, res);
+ }
+
+ @Test
+ public void testEntriesOutOfOrderWithFlush() throws Exception {
+ storage.setMasterKey(1, "key".getBytes());
+
+ ByteBuf entry2 = Unpooled.buffer(1024);
+ entry2.writeLong(1); // ledger id
+ entry2.writeLong(2); // entry id
+ entry2.writeBytes("entry-2".getBytes());
+
+ storage.addEntry(entry2);
+
+ try {
+ storage.getEntry(1, 1);
+ fail("Entry doesn't exist");
+ } catch (NoEntryException e) {
+ // Ok, entry doesn't exist
+ }
+
+ ByteBuf res = storage.getEntry(1, 2);
+ assertEquals(entry2, res);
+ res.release();
+
+ storage.flush();
+
+ try {
+ storage.getEntry(1, 1);
+ fail("Entry doesn't exist");
+ } catch (NoEntryException e) {
+ // Ok, entry doesn't exist
+ }
+
+ res = storage.getEntry(1, 2);
+ assertEquals(entry2, res);
+ res.release();
+
+ ByteBuf entry1 = Unpooled.buffer(1024);
+ entry1.writeLong(1); // ledger id
+ entry1.writeLong(1); // entry id
+ entry1.writeBytes("entry-1".getBytes());
+
+ storage.addEntry(entry1);
+
+ res = storage.getEntry(1, 1);
+ assertEquals(entry1, res);
+ res.release();
+
+ res = storage.getEntry(1, 2);
+ assertEquals(entry2, res);
+ res.release();
+
+ storage.flush();
+
+ res = storage.getEntry(1, 1);
+ assertEquals(entry1, res);
+ res.release();
+
+ res = storage.getEntry(1, 2);
+ assertEquals(entry2, res);
+ res.release();
+ }
+
+ @Test
+ public void testAddEntriesAfterDelete() throws Exception {
+ storage.setMasterKey(1, "key".getBytes());
+
+ ByteBuf entry0 = Unpooled.buffer(1024);
+ entry0.writeLong(1); // ledger id
+ entry0.writeLong(0); // entry id
+ entry0.writeBytes("entry-0".getBytes());
+
+ ByteBuf entry1 = Unpooled.buffer(1024);
+ entry1.writeLong(1); // ledger id
+ entry1.writeLong(1); // entry id
+ entry1.writeBytes("entry-1".getBytes());
+
+ storage.addEntry(entry0);
+ storage.addEntry(entry1);
+
+ storage.flush();
+
+ storage.deleteLedger(1);
+
+ storage.setMasterKey(1, "key".getBytes());
+
+ entry0 = Unpooled.buffer(1024);
+ entry0.writeLong(1); // ledger id
+ entry0.writeLong(0); // entry id
+ entry0.writeBytes("entry-0".getBytes());
+
+ entry1 = Unpooled.buffer(1024);
+ entry1.writeLong(1); // ledger id
+ entry1.writeLong(1); // entry id
+ entry1.writeBytes("entry-1".getBytes());
+
+ storage.addEntry(entry0);
+ storage.addEntry(entry1);
+
+ assertEquals(entry0, storage.getEntry(1, 0));
+ assertEquals(entry1, storage.getEntry(1, 1));
+
+ storage.flush();
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
new file mode 100644
index 0000000..0d3f5bb
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link DbLedgerStorage}.
+ */
+public class DbLedgerStorageWriteCacheTest {
+
+ private DbLedgerStorage storage;
+ private File tmpDir;
+
+ private static class MockedDbLedgerStorage extends DbLedgerStorage {
+
+ @Override
+ public void flush() throws IOException {
+ flushMutex.lock();
+ try {
+ // Swap the write caches and block indefinitely to simulate a
slow disk
+ WriteCache tmp = writeCacheBeingFlushed;
+ writeCacheBeingFlushed = writeCache;
+ writeCache = tmp;
+
+ // since the cache is switched, we can allow flush to be
triggered
+ hasFlushBeenTriggered.set(false);
+
+ // Block the flushing thread
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ return;
+ }
+ } finally {
+ flushMutex.unlock();
+ }
+ }
+
+ }
+
+ @Before
+ public void setup() throws Exception {
+ tmpDir = File.createTempFile("bkTest", ".dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File curDir = Bookie.getCurrentDirectory(tmpDir);
+ Bookie.checkDirectoryStructure(curDir);
+
+ int gcWaitTime = 1000;
+ ServerConfiguration conf = new ServerConfiguration();
+ conf.setGcWaitTime(gcWaitTime);
+ conf.setAllowLoopback(true);
+ conf.setLedgerStorageClass(MockedDbLedgerStorage.class.getName());
+ conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 1);
+ conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+ Bookie bookie = new Bookie(conf);
+
+ storage = (DbLedgerStorage) bookie.getLedgerStorage();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ storage.shutdown();
+ tmpDir.delete();
+ }
+
+ @Test
+ public void writeCacheFull() throws Exception {
+ storage.setMasterKey(4, "key".getBytes());
+ assertEquals(false, storage.isFenced(4));
+ assertEquals(true, storage.ledgerExists(4));
+
+ assertEquals("key", new String(storage.readMasterKey(4)));
+
+ // Add enough entries to fill the 1st write cache
+ for (int i = 0; i < 5; i++) {
+ ByteBuf entry = Unpooled.buffer(100 * 1024 + 2 * 8);
+ entry.writeLong(4); // ledger id
+ entry.writeLong(i); // entry id
+ entry.writeZero(100 * 1024);
+ storage.addEntry(entry);
+ }
+
+ for (int i = 0; i < 5; i++) {
+ ByteBuf entry = Unpooled.buffer(100 * 1024 + 2 * 8);
+ entry.writeLong(4); // ledger id
+ entry.writeLong(5 + i); // entry id
+ entry.writeZero(100 * 1024);
+ storage.addEntry(entry);
+ }
+
+ // Next add should fail for cache full
+ ByteBuf entry = Unpooled.buffer(100 * 1024 + 2 * 8);
+ entry.writeLong(4); // ledger id
+ entry.writeLong(22); // entry id
+ entry.writeZero(100 * 1024);
+
+ try {
+ storage.addEntry(entry);
+ fail("Should have thrown exception");
+ } catch (IOException e) {
+ // Expected
+ }
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java
new file mode 100644
index 0000000..6e83ffd
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link EntryLocationIndex}.
+ */
+public class EntryLocationIndexTest {
+
+ private final ServerConfiguration serverConfiguration = new
ServerConfiguration();
+
+ @Test
+ public void deleteLedgerTest() throws Exception {
+ File tmpDir = File.createTempFile("bkTest", ".dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ tmpDir.deleteOnExit();
+
+ EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration,
KeyValueStorageRocksDB.factory,
+ tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE);
+
+ // Add some dummy indexes
+ idx.addLocation(40312, 0, 1);
+ idx.addLocation(40313, 10, 2);
+ idx.addLocation(40320, 0, 3);
+
+ // Add more indexes in a different batch
+ idx.addLocation(40313, 11, 5);
+ idx.addLocation(40313, 12, 6);
+ idx.addLocation(40320, 1, 7);
+ idx.addLocation(40312, 3, 4);
+
+ idx.delete(40313);
+
+ assertEquals(1, idx.getLocation(40312, 0));
+ assertEquals(4, idx.getLocation(40312, 3));
+ assertEquals(3, idx.getLocation(40320, 0));
+ assertEquals(7, idx.getLocation(40320, 1));
+
+ assertEquals(2, idx.getLocation(40313, 10));
+ assertEquals(5, idx.getLocation(40313, 11));
+ assertEquals(6, idx.getLocation(40313, 12));
+
+ idx.removeOffsetFromDeletedLedgers();
+
+ // After flush the keys will be removed
+ assertEquals(0, idx.getLocation(40313, 10));
+ assertEquals(0, idx.getLocation(40313, 11));
+ assertEquals(0, idx.getLocation(40313, 12));
+
+ idx.close();
+ }
+
+ // this tests if a ledger is added after it has been deleted
+ @Test
+ public void addLedgerAfterDeleteTest() throws Exception {
+ File tmpDir = File.createTempFile("bkTest", ".dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ tmpDir.deleteOnExit();
+
+ EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration,
KeyValueStorageRocksDB.factory,
+ tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE);
+
+ // Add some dummy indexes
+ idx.addLocation(40312, 0, 1);
+ idx.addLocation(40313, 10, 2);
+ idx.addLocation(40320, 0, 3);
+
+ idx.delete(40313);
+
+ // Add more indexes in a different batch
+ idx.addLocation(40313, 11, 5);
+ idx.addLocation(40313, 12, 6);
+ idx.addLocation(40320, 1, 7);
+ idx.addLocation(40312, 3, 4);
+
+ idx.removeOffsetFromDeletedLedgers();
+
+ assertEquals(0, idx.getLocation(40313, 11));
+ assertEquals(0, idx.getLocation(40313, 12));
+
+ idx.close();
+ }
+}
diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
index 2cc5ce7..736c88b 100644
--- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
+++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
@@ -26,6 +26,10 @@
</Match>
<Match>
<!-- generated code, we can't be held responsible for findbugs in it //-->
+ <Class
name="~org\.apache\.bookkeeper\.bookie\.storage\.ldb\.DbLedgerStorageDataFormats.*"
/>
+ </Match>
+ <Match>
+ <!-- generated code, we can't be held responsible for findbugs in it //-->
<Class name="~org\.apache\.bookkeeper\.tests\.generated.*" />
</Match>
<Match>
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].