This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 89ac2a2 DbLedgerStorage -- Added bookie shell tools
89ac2a2 is described below
commit 89ac2a2b7b7883f0b48978bf3dc3802ecf0fc6c1
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Dec 15 11:54:06 2017 -0800
DbLedgerStorage -- Added bookie shell tools
Added BookieShell tools for DbLedgerStorage conversion operations:
* `convert-to-db-storage` --> Convert interleaved indexes to db storage
indexes
* `convert-to-interleaved-storage` --> Convert back from db storage to
interleaved storage indexes
* `rebuild-db-ledger-locations-index` --> Rebuild the DbLedgerStorage
index by scanning the entry log files.
Author: Matteo Merli <[email protected]>
Reviewers: Sijie Guo <[email protected]>
This closes #859 from merlimat/db-ledger-storage-itemized
---
.../org/apache/bookkeeper/bookie/BookieShell.java | 308 ++++++++++++++++++++-
.../bookie/storage/ldb/DbLedgerStorage.java | 8 +-
.../storage/ldb/LocationsIndexRebuildOp.java | 139 ++++++++++
.../bookie/storage/ldb/ConversionRollbackTest.java | 145 ++++++++++
.../bookie/storage/ldb/ConversionTest.java | 158 +++++++++++
.../storage/ldb/LocationsIndexRebuildTest.java | 148 ++++++++++
6 files changed, 901 insertions(+), 5 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index f8946c1..ccffb25 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Charsets.UTF_8;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractFuture;
import io.netty.buffer.ByteBuf;
@@ -33,6 +34,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
+import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -59,8 +61,12 @@ import java.util.function.Predicate;
import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.bookie.storage.ldb.EntryLocationIndex;
+import org.apache.bookkeeper.bookie.storage.ldb.LocationsIndexRebuildOp;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -142,6 +148,9 @@ public class BookieShell implements Tool {
static final String CMD_DECOMMISSIONBOOKIE = "decommissionbookie";
static final String CMD_LOSTBOOKIERECOVERYDELAY =
"lostbookierecoverydelay";
static final String CMD_TRIGGERAUDIT = "triggeraudit";
+ static final String CMD_CONVERT_TO_DB_STORAGE = "convert-to-db-storage";
+ static final String CMD_CONVERT_TO_INTERLEAVED_STORAGE =
"convert-to-interleaved-storage";
+ static final String CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX =
"rebuild-db-ledger-locations-index";
static final String CMD_HELP = "help";
final ServerConfiguration bkConf = new ServerConfiguration();
@@ -1879,7 +1888,6 @@ public class BookieShell implements Tool {
}
return 0;
}
-
}
/**
@@ -2113,6 +2121,241 @@ public class BookieShell implements Tool {
void progress(long updated, long issued);
}
+
+ /**
+ * Convert bookie indexes from InterleavedStorage to DbLedgerStorage
format.
+ */
+ class ConvertToDbStorageCmd extends MyCommand {
+ Options opts = new Options();
+
+ public ConvertToDbStorageCmd() {
+ super(CMD_CONVERT_TO_DB_STORAGE);
+ }
+
+ @Override
+ Options getOptions() {
+ return opts;
+ }
+
+ @Override
+ String getDescription() {
+ return "Convert bookie indexes from InterleavedStorage to
DbLedgerStorage format";
+ }
+
+ String getUsage() {
+ return CMD_CONVERT_TO_DB_STORAGE;
+ }
+
+ @Override
+ int runCmd(CommandLine cmdLine) throws Exception {
+ LOG.info("=== Converting to DbLedgerStorage ===");
+ ServerConfiguration conf = new ServerConfiguration(bkConf);
+ LedgerDirsManager ledgerDirsManager = new
LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+ new DiskChecker(bkConf.getDiskUsageThreshold(),
bkConf.getDiskUsageWarnThreshold()));
+ LedgerDirsManager ledgerIndexManager = new
LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+ new DiskChecker(bkConf.getDiskUsageThreshold(),
bkConf.getDiskUsageWarnThreshold()));
+
+ InterleavedLedgerStorage interleavedStorage = new
InterleavedLedgerStorage();
+ DbLedgerStorage dbStorage = new DbLedgerStorage();
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+ @Override
+ public Checkpoint newCheckpoint() {
+ return Checkpoint.MAX;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint,
boolean compact)
+ throws IOException {
+ }
+ };
+ Checkpointer checkpointer = new Checkpointer() {
+ @Override
+ public void startCheckpoint(Checkpoint checkpoint) {
+ // No-op
+ }
+ };
+
+ interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerIndexManager,
+ checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+ dbStorage.initialize(conf, null, ledgerDirsManager,
ledgerIndexManager,
+ checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+
+ int convertedLedgers = 0;
+ for (long ledgerId : interleavedStorage.getActiveLedgersInRange(0,
Long.MAX_VALUE)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Converting ledger {}", ledgerId);
+ }
+
+ FileInfo fi = getFileInfo(ledgerId);
+
+ Iterable<SortedMap<Long, Long>> entries =
getLedgerIndexEntries(ledgerId);
+
+ long numberOfEntries = dbStorage.addLedgerToIndex(ledgerId,
fi.isFenced(), fi.getMasterKey(), entries);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" -- done. fenced={} entries={}",
fi.isFenced(), numberOfEntries);
+ }
+
+ // Remove index from old storage
+ interleavedStorage.deleteLedger(ledgerId);
+
+ if (++convertedLedgers % 1000 == 0) {
+ LOG.info("Converted {} ledgers", convertedLedgers);
+ }
+ }
+
+ dbStorage.shutdown();
+ interleavedStorage.shutdown();
+
+ LOG.info("---- Done Converting ----");
+ return 0;
+ }
+ }
+
+ /**
+ * Convert bookie indexes from DbLedgerStorage to InterleavedStorage
format.
+ */
+ class ConvertToInterleavedStorageCmd extends MyCommand {
+ Options opts = new Options();
+
+ public ConvertToInterleavedStorageCmd() {
+ super(CMD_CONVERT_TO_INTERLEAVED_STORAGE);
+ }
+
+ @Override
+ Options getOptions() {
+ return opts;
+ }
+
+ @Override
+ String getDescription() {
+ return "Convert bookie indexes from DbLedgerStorage to
InterleavedStorage format";
+ }
+
+ String getUsage() {
+ return CMD_CONVERT_TO_INTERLEAVED_STORAGE;
+ }
+
+ @Override
+ int runCmd(CommandLine cmdLine) throws Exception {
+ LOG.info("=== Converting DbLedgerStorage ===");
+ ServerConfiguration conf = new ServerConfiguration(bkConf);
+ LedgerDirsManager ledgerDirsManager = new
LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+ new DiskChecker(bkConf.getDiskUsageThreshold(),
bkConf.getDiskUsageWarnThreshold()));
+ LedgerDirsManager ledgerIndexManager = new
LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+ new DiskChecker(bkConf.getDiskUsageThreshold(),
bkConf.getDiskUsageWarnThreshold()));
+
+ DbLedgerStorage dbStorage = new DbLedgerStorage();
+ InterleavedLedgerStorage interleavedStorage = new
InterleavedLedgerStorage();
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+ @Override
+ public Checkpoint newCheckpoint() {
+ return Checkpoint.MAX;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint,
boolean compact)
+ throws IOException {
+ }
+ };
+ Checkpointer checkpointer = new Checkpointer() {
+ @Override
+ public void startCheckpoint(Checkpoint checkpoint) {
+ // No-op
+ }
+ };
+
+ dbStorage.initialize(conf, null, ledgerDirsManager,
ledgerIndexManager,
+ checkpointSource, checkpointer,
NullStatsLogger.INSTANCE);
+ interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerIndexManager,
+ checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+ LedgerCache interleavedLedgerCache =
interleavedStorage.ledgerCache;
+
+ EntryLocationIndex dbEntryLocationIndex =
dbStorage.getEntryLocationIndex();
+
+ int convertedLedgers = 0;
+ for (long ledgerId : dbStorage.getActiveLedgersInRange(0,
Long.MAX_VALUE)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Converting ledger {}", ledgerId);
+ }
+
+ interleavedStorage.setMasterKey(ledgerId,
dbStorage.readMasterKey(ledgerId));
+ if (dbStorage.isFenced(ledgerId)) {
+ interleavedStorage.setFenced(ledgerId);
+ }
+
+ long lastEntryInLedger =
dbEntryLocationIndex.getLastEntryInLedger(ledgerId);
+ for (long entryId = 0; entryId <= lastEntryInLedger;
entryId++) {
+ try {
+ long location =
dbEntryLocationIndex.getLocation(ledgerId, entryId);
+ if (location != 0L) {
+ interleavedLedgerCache.putEntryOffset(ledgerId,
entryId, location);
+ }
+ } catch (Bookie.NoEntryException e) {
+ // Ignore entry
+ }
+ }
+
+ if (++convertedLedgers % 1000 == 0) {
+ LOG.info("Converted {} ledgers", convertedLedgers);
+ }
+ }
+
+ dbStorage.shutdown();
+
+ interleavedLedgerCache.flushLedger(true);
+ interleavedStorage.flush();
+ interleavedStorage.shutdown();
+
+ String baseDir =
ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+
+ // Rename databases and keep backup
+ Files.move(FileSystems.getDefault().getPath(baseDir, "ledgers"),
+ FileSystems.getDefault().getPath(baseDir,
"ledgers.backup"));
+
+ Files.move(FileSystems.getDefault().getPath(baseDir, "locations"),
+ FileSystems.getDefault().getPath(baseDir,
"locations.backup"));
+
+ LOG.info("---- Done Converting {} ledgers ----", convertedLedgers);
+ return 0;
+ }
+ }
+
+ /**
+ * Rebuild DbLedgerStorage locations index.
+ */
+ class RebuildDbLedgerLocationsIndexCmd extends MyCommand {
+ Options opts = new Options();
+
+ public RebuildDbLedgerLocationsIndexCmd() {
+ super(CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX);
+ }
+
+ @Override
+ Options getOptions() {
+ return opts;
+ }
+
+ @Override
+ String getDescription() {
+ return "Rebuild DbLedgerStorage locations index by scanning the
entry logs";
+ }
+
+ String getUsage() {
+ return CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX;
+ }
+
+ @Override
+ int runCmd(CommandLine cmdLine) throws Exception {
+ LOG.info("=== Rebuilding bookie index ===");
+ ServerConfiguration conf = new ServerConfiguration(bkConf);
+ new LocationsIndexRebuildOp(conf).initiate();
+ LOG.info("-- Done rebuilding bookie index --");
+ return 0;
+ }
+ }
+
final Map<String, MyCommand> commands = new HashMap<String, MyCommand>();
{
commands.put(CMD_METAFORMAT, new MetaFormatCmd());
@@ -2138,6 +2381,9 @@ public class BookieShell implements Tool {
commands.put(CMD_DELETELEDGER, new DeleteLedgerCmd());
commands.put(CMD_BOOKIEINFO, new BookieInfoCmd());
commands.put(CMD_DECOMMISSIONBOOKIE, new DecommissionBookieCmd());
+ commands.put(CMD_CONVERT_TO_DB_STORAGE, new ConvertToDbStorageCmd());
+ commands.put(CMD_CONVERT_TO_INTERLEAVED_STORAGE, new
ConvertToInterleavedStorageCmd());
+ commands.put(CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX, new
RebuildDbLedgerLocationsIndexCmd());
commands.put(CMD_HELP, new HelpCmd());
commands.put(CMD_LOSTBOOKIERECOVERYDELAY, new
LostBookieRecoveryDelayCmd());
commands.put(CMD_TRIGGERAUDIT, new TriggerAuditCmd());
@@ -2433,6 +2679,66 @@ public class BookieShell implements Tool {
}
/**
+ * Get an iterable over pages of entries and locations for a ledger.
+ *
+ * @param ledgerId
+ * @return
+ * @throws IOException
+ */
+ protected Iterable<SortedMap<Long, Long>> getLedgerIndexEntries(final long
ledgerId) throws IOException {
+ final FileInfo fi = getFileInfo(ledgerId);
+ final long size = fi.size();
+
+ final LedgerEntryPage lep = new LedgerEntryPage(pageSize,
entriesPerPage);
+ lep.usePage();
+
+ final Iterator<SortedMap<Long, Long>> iterator = new
Iterator<SortedMap<Long, Long>>() {
+ long curSize = 0;
+ long curEntry = 0;
+
+ @Override
+ public boolean hasNext() {
+ return curSize < size;
+ }
+
+ @Override
+ public SortedMap<Long, Long> next() {
+ SortedMap<Long, Long> entries = Maps.newTreeMap();
+ lep.setLedgerAndFirstEntry(ledgerId, curEntry);
+ try {
+ lep.readPage(fi);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ // process a page
+ for (int i = 0; i < entriesPerPage; i++) {
+ long offset = lep.getOffset(i * 8);
+ if (offset != 0) {
+ entries.put(curEntry, offset);
+ }
+ ++curEntry;
+ }
+
+ curSize += pageSize;
+ return entries;
+ }
+
+ @Override
+ public void remove() {
+ throw new RuntimeException("Cannot remove");
+ }
+
+ };
+
+ return new Iterable<SortedMap<Long, Long>>() {
+ public Iterator<SortedMap<Long, Long>> iterator() {
+ return iterator;
+ }
+ };
+ }
+
+ /**
* Scan over an entry log file.
*
* @param logId
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index fa80016..b0257ec 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -30,8 +30,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
import java.util.SortedMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
@@ -53,9 +51,11 @@ import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.EntryLogger;
import org.apache.bookkeeper.bookie.GarbageCollectorThread;
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
@@ -714,8 +714,8 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
}
@Override
- public Observable waitForLastAddConfirmedUpdate(long ledgerId, long
previoisLAC, Observer observer)
- throws IOException {
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId, long
previousLAC,
+ Watcher<LastAddConfirmedUpdateNotification> watcher) throws
IOException {
throw new UnsupportedOperationException();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
new file mode 100644
index 0000000..6cf6232
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.lang.time.DurationFormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scan all entries in the entry log and rebuild the ledgerStorageIndex.
+ */
+public class LocationsIndexRebuildOp {
+ private final ServerConfiguration conf;
+
+ public LocationsIndexRebuildOp(ServerConfiguration conf) {
+ this.conf = conf;
+ }
+
+ public void initiate() throws IOException {
+ LOG.info("Starting index rebuilding");
+
+ // Move locations index to a backup directory
+ String basePath =
Bookie.getCurrentDirectory(conf.getLedgerDirs()[0]).toString();
+ Path currentPath = FileSystems.getDefault().getPath(basePath,
"locations");
+ String timestamp = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date());
+ Path backupPath = FileSystems.getDefault().getPath(basePath,
"locations.BACKUP-" + timestamp);
+ Files.move(currentPath, backupPath);
+
+ LOG.info("Created locations index backup at {}", backupPath);
+
+ long startTime = System.nanoTime();
+
+ EntryLogger entryLogger = new EntryLogger(conf, new
LedgerDirsManager(conf, conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold())));
+ Set<Long> entryLogs = entryLogger.getEntryLogsSet();
+
+ String locationsDbPath = FileSystems.getDefault().getPath(basePath,
"locations").toFile().toString();
+
+ Set<Long> activeLedgers = getActiveLedgers(conf,
KeyValueStorageRocksDB.factory, basePath);
+ LOG.info("Found {} active ledgers in ledger manager",
activeLedgers.size());
+
+ KeyValueStorage newIndex =
KeyValueStorageRocksDB.factory.newKeyValueStorage(locationsDbPath,
DbConfigType.Huge,
+ conf);
+
+ int totalEntryLogs = entryLogs.size();
+ int completedEntryLogs = 0;
+ LOG.info("Scanning {} entry logs", totalEntryLogs);
+
+ for (long entryLogId : entryLogs) {
+ entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
+ @Override
+ public void process(long ledgerId, long offset, ByteBuf entry)
throws IOException {
+ long entryId = entry.getLong(8);
+
+ // Actual location indexed is pointing past the entry size
+ long location = (entryLogId << 32L) | (offset + 4);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Rebuilding {}:{} at location {} / {}",
ledgerId, entryId, location >> 32,
+ location & (Integer.MAX_VALUE - 1));
+ }
+
+ // Update the ledger index page
+ LongPairWrapper key = LongPairWrapper.get(ledgerId,
entryId);
+ LongWrapper value = LongWrapper.get(location);
+ newIndex.put(key.array, value.array);
+ }
+
+ @Override
+ public boolean accept(long ledgerId) {
+ return activeLedgers.contains(ledgerId);
+ }
+ });
+
+ ++completedEntryLogs;
+ LOG.info("Completed scanning of log {}.log -- {} / {}",
Long.toHexString(entryLogId), completedEntryLogs,
+ totalEntryLogs);
+ }
+
+ newIndex.sync();
+ newIndex.close();
+
+ LOG.info("Rebuilding index is done. Total time: {}",
+
DurationFormatUtils.formatDurationHMS(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
- startTime)));
+ }
+
+ private Set<Long> getActiveLedgers(ServerConfiguration conf,
KeyValueStorageFactory storageFactory, String basePath)
+ throws IOException {
+ LedgerMetadataIndex ledgers = new LedgerMetadataIndex(conf,
storageFactory, basePath, NullStatsLogger.INSTANCE);
+ Set<Long> activeLedgers = Sets.newHashSet();
+ for (Long ledger : ledgers.getActiveLedgersInRange(0, Long.MAX_VALUE))
{
+ activeLedgers.add(ledger);
+ }
+
+ ledgers.close();
+ return activeLedgers;
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LocationsIndexRebuildOp.class);
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
new file mode 100644
index 0000000..a954f28
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
@@ -0,0 +1,145 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Test for BookieShell convert-to-interleaved-storage command.
+ */
+@Slf4j
+public class ConversionRollbackTest {
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+ @Override
+ public Checkpoint newCheckpoint() {
+ return Checkpoint.MAX;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
throws IOException {
+ }
+ };
+
+ Checkpointer checkpointer = new Checkpointer() {
+ @Override
+ public void startCheckpoint(Checkpoint checkpoint) {
+ // No-op
+ }
+ };
+
+ @Test
+ public void convertFromDbStorageToInterleaved() throws Exception {
+ File tmpDir = File.createTempFile("bkTest", ".dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File curDir = Bookie.getCurrentDirectory(tmpDir);
+ Bookie.checkDirectoryStructure(curDir);
+
+ log.info("Using temp directory: {}", tmpDir);
+
+ ServerConfiguration conf = new ServerConfiguration();
+ conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+ conf.setAllowLoopback(true);
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf,
conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
+
+ DbLedgerStorage dbStorage = new DbLedgerStorage();
+ dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE);
+
+ // Insert some ledger & entries in the dbStorage
+ for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+ dbStorage.setMasterKey(ledgerId, ("ledger-" +
ledgerId).getBytes());
+ dbStorage.setFenced(ledgerId);
+
+ for (long entryId = 0; entryId < 10000; entryId++) {
+ ByteBuf entry = Unpooled.buffer(128);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ dbStorage.addEntry(entry);
+ }
+ }
+
+ dbStorage.flush();
+ dbStorage.shutdown();
+
+ // Run conversion tool
+ BookieShell shell = new BookieShell();
+ shell.setConf(conf);
+ int res = shell.run(new String[] { "convert-to-interleaved-storage" });
+
+ Assert.assertEquals(0, res);
+
+ // Verify that interleaved storage index has the same entries
+ InterleavedLedgerStorage interleavedStorage = new
InterleavedLedgerStorage();
+ interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE);
+
+ Set<Long> ledgers =
Sets.newTreeSet(interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+ Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L,
4L)), ledgers);
+
+ for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+ Assert.assertEquals(true, interleavedStorage.isFenced(ledgerId));
+ Assert.assertEquals("ledger-" + ledgerId, new
String(interleavedStorage.readMasterKey(ledgerId)));
+
+ for (long entryId = 0; entryId < 10000; entryId++) {
+ ByteBuf entry = Unpooled.buffer(1024);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ ByteBuf result = interleavedStorage.getEntry(ledgerId,
entryId);
+ Assert.assertEquals(entry, result);
+ }
+ }
+
+ interleavedStorage.shutdown();
+ FileUtils.forceDelete(tmpDir);
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
new file mode 100644
index 0000000..df5434c
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
@@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for BookieShell convert-to-db-storage command.
+ */
+public class ConversionTest {
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+ @Override
+ public Checkpoint newCheckpoint() {
+ return Checkpoint.MAX;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
throws IOException {
+ }
+ };
+
+ Checkpointer checkpointer = new Checkpointer() {
+ @Override
+ public void startCheckpoint(Checkpoint checkpoint) {
+ // No-op
+ }
+ };
+
+ @Test
+ public void test() throws Exception {
+ File tmpDir = File.createTempFile("bkTest", ".dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File curDir = Bookie.getCurrentDirectory(tmpDir);
+ Bookie.checkDirectoryStructure(curDir);
+
+ System.out.println(tmpDir);
+
+ ServerConfiguration conf = new ServerConfiguration();
+ conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+ conf.setAllowLoopback(true);
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf,
conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
+
+ InterleavedLedgerStorage interleavedStorage = new
InterleavedLedgerStorage();
+ interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE);
+
+ // Insert some ledger & entries in the interleaved storage
+ for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+ interleavedStorage.setMasterKey(ledgerId, ("ledger-" +
ledgerId).getBytes());
+ interleavedStorage.setFenced(ledgerId);
+
+ for (long entryId = 0; entryId < 10000; entryId++) {
+ ByteBuf entry = Unpooled.buffer(128);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ interleavedStorage.addEntry(entry);
+ }
+ }
+
+ interleavedStorage.flush();
+ interleavedStorage.shutdown();
+
+ // Run conversion tool
+ BookieShell shell = new BookieShell();
+ shell.setConf(conf);
+ int res = shell.run(new String[] { "convert-to-db-storage" });
+
+ Assert.assertEquals(0, res);
+
+ // Verify that db index has the same entries
+ DbLedgerStorage dbStorage = new DbLedgerStorage();
+ dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE);
+
+ interleavedStorage = new InterleavedLedgerStorage();
+ interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE);
+
+ Set<Long> ledgers =
Sets.newTreeSet(dbStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+ Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L,
4L)), ledgers);
+
+ ledgers =
Sets.newTreeSet(interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+ Assert.assertEquals(Sets.newTreeSet(), ledgers);
+
+ for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+ Assert.assertEquals(true, dbStorage.isFenced(ledgerId));
+ Assert.assertEquals("ledger-" + ledgerId, new
String(dbStorage.readMasterKey(ledgerId)));
+
+ for (long entryId = 0; entryId < 10000; entryId++) {
+ ByteBuf entry = Unpooled.buffer(1024);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ ByteBuf result = dbStorage.getEntry(ledgerId, entryId);
+ Assert.assertEquals(entry, result);
+ result.release();
+
+ try {
+ interleavedStorage.getEntry(ledgerId, entryId);
+ Assert.fail("entry should not exist");
+ } catch (NoLedgerException e) {
+ // Ok
+ }
+ }
+ }
+
+ interleavedStorage.shutdown();
+ dbStorage.shutdown();
+ FileUtils.forceDelete(tmpDir);
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
new file mode 100644
index 0000000..4793ec7
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for class {@link LocationsIndexRebuildOp}.
+ */
+public class LocationsIndexRebuildTest {
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+ @Override
+ public Checkpoint newCheckpoint() {
+ return Checkpoint.MAX;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
throws IOException {
+ }
+ };
+
+ Checkpointer checkpointer = new Checkpointer() {
+ @Override
+ public void startCheckpoint(Checkpoint checkpoint) {
+ // No-op
+ }
+ };
+
+ @Test
+ public void test() throws Exception {
+ File tmpDir = File.createTempFile("bkTest", ".dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File curDir = Bookie.getCurrentDirectory(tmpDir);
+ Bookie.checkDirectoryStructure(curDir);
+
+ System.out.println(tmpDir);
+
+ ServerConfiguration conf = new ServerConfiguration();
+ conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+ conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ conf.setAllowLoopback(true);
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf,
conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
+
+ DbLedgerStorage ledgerStorage = new DbLedgerStorage();
+ ledgerStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE);
+
+ // Insert some ledger & entries in the storage
+ for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+ ledgerStorage.setMasterKey(ledgerId, ("ledger-" +
ledgerId).getBytes());
+ ledgerStorage.setFenced(ledgerId);
+
+ for (long entryId = 0; entryId < 100; entryId++) {
+ ByteBuf entry = Unpooled.buffer(128);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ ledgerStorage.addEntry(entry);
+ }
+ }
+
+ ledgerStorage.flush();
+ ledgerStorage.shutdown();
+
+ // Rebuild index through the tool
+ BookieShell shell = new BookieShell();
+ shell.setConf(conf);
+ int res = shell.run(new String[] { "rebuild-db-ledger-locations-index"
});
+
+ Assert.assertEquals(0, res);
+
+ // Verify that db index has the same entries
+ ledgerStorage = new DbLedgerStorage();
+ ledgerStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE);
+
+ Set<Long> ledgers =
Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+ Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L,
4L)), ledgers);
+
+ for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+ Assert.assertEquals(true, ledgerStorage.isFenced(ledgerId));
+ Assert.assertEquals("ledger-" + ledgerId, new
String(ledgerStorage.readMasterKey(ledgerId)));
+
+ ByteBuf lastEntry = ledgerStorage.getLastEntry(ledgerId);
+ assertEquals(ledgerId, lastEntry.readLong());
+ long lastEntryId = lastEntry.readLong();
+ assertEquals(99, lastEntryId);
+
+ for (long entryId = 0; entryId < 100; entryId++) {
+ ByteBuf entry = Unpooled.buffer(1024);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ ByteBuf result = ledgerStorage.getEntry(ledgerId, entryId);
+ Assert.assertEquals(entry, result);
+ }
+ }
+
+ ledgerStorage.shutdown();
+ FileUtils.forceDelete(tmpDir);
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].