sijie closed pull request #859: DbLedgerStorage -- Added bookie shell tools
URL: https://github.com/apache/bookkeeper/pull/859
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index f8946c1f6..ccffb25f7 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -22,6 +22,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractFuture;
import io.netty.buffer.ByteBuf;
@@ -33,6 +34,7 @@
import java.io.Serializable;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
+import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -59,8 +61,12 @@
import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.bookie.storage.ldb.EntryLocationIndex;
+import org.apache.bookkeeper.bookie.storage.ldb.LocationsIndexRebuildOp;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -142,6 +148,9 @@
static final String CMD_DECOMMISSIONBOOKIE = "decommissionbookie";
static final String CMD_LOSTBOOKIERECOVERYDELAY =
"lostbookierecoverydelay";
static final String CMD_TRIGGERAUDIT = "triggeraudit";
+ static final String CMD_CONVERT_TO_DB_STORAGE = "convert-to-db-storage";
+ static final String CMD_CONVERT_TO_INTERLEAVED_STORAGE =
"convert-to-interleaved-storage";
+ static final String CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX =
"rebuild-db-ledger-locations-index";
static final String CMD_HELP = "help";
final ServerConfiguration bkConf = new ServerConfiguration();
@@ -1879,7 +1888,6 @@ public void progress(long updated, long issued) {
}
return 0;
}
-
}
/**
@@ -2113,6 +2121,241 @@ public int runCmd(CommandLine cmdLine) throws Exception
{
void progress(long updated, long issued);
}
+
+ /**
+ * Convert bookie indexes from InterleavedStorage to DbLedgerStorage
format.
+ */
+ class ConvertToDbStorageCmd extends MyCommand {
+ Options opts = new Options();
+
+ public ConvertToDbStorageCmd() {
+ super(CMD_CONVERT_TO_DB_STORAGE);
+ }
+
+ @Override
+ Options getOptions() {
+ return opts;
+ }
+
+ @Override
+ String getDescription() {
+ return "Convert bookie indexes from InterleavedStorage to
DbLedgerStorage format";
+ }
+
+ String getUsage() {
+ return CMD_CONVERT_TO_DB_STORAGE;
+ }
+
+ @Override
+ int runCmd(CommandLine cmdLine) throws Exception {
+ LOG.info("=== Converting to DbLedgerStorage ===");
+ ServerConfiguration conf = new ServerConfiguration(bkConf);
+ LedgerDirsManager ledgerDirsManager = new
LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+ new DiskChecker(bkConf.getDiskUsageThreshold(),
bkConf.getDiskUsageWarnThreshold()));
+ LedgerDirsManager ledgerIndexManager = new
LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+ new DiskChecker(bkConf.getDiskUsageThreshold(),
bkConf.getDiskUsageWarnThreshold()));
+
+ InterleavedLedgerStorage interleavedStorage = new
InterleavedLedgerStorage();
+ DbLedgerStorage dbStorage = new DbLedgerStorage();
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+ @Override
+ public Checkpoint newCheckpoint() {
+ return Checkpoint.MAX;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint,
boolean compact)
+ throws IOException {
+ }
+ };
+ Checkpointer checkpointer = new Checkpointer() {
+ @Override
+ public void startCheckpoint(Checkpoint checkpoint) {
+ // No-op
+ }
+ };
+
+ interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerIndexManager,
+ checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+ dbStorage.initialize(conf, null, ledgerDirsManager,
ledgerIndexManager,
+ checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+
+ int convertedLedgers = 0;
+ for (long ledgerId : interleavedStorage.getActiveLedgersInRange(0,
Long.MAX_VALUE)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Converting ledger {}", ledgerId);
+ }
+
+ FileInfo fi = getFileInfo(ledgerId);
+
+ Iterable<SortedMap<Long, Long>> entries =
getLedgerIndexEntries(ledgerId);
+
+ long numberOfEntries = dbStorage.addLedgerToIndex(ledgerId,
fi.isFenced(), fi.getMasterKey(), entries);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" -- done. fenced={} entries={}",
fi.isFenced(), numberOfEntries);
+ }
+
+ // Remove index from old storage
+ interleavedStorage.deleteLedger(ledgerId);
+
+ if (++convertedLedgers % 1000 == 0) {
+ LOG.info("Converted {} ledgers", convertedLedgers);
+ }
+ }
+
+ dbStorage.shutdown();
+ interleavedStorage.shutdown();
+
+ LOG.info("---- Done Converting ----");
+ return 0;
+ }
+ }
+
+ /**
+ * Convert bookie indexes from DbLedgerStorage to InterleavedStorage
format.
+ */
+ class ConvertToInterleavedStorageCmd extends MyCommand {
+ Options opts = new Options();
+
+ public ConvertToInterleavedStorageCmd() {
+ super(CMD_CONVERT_TO_INTERLEAVED_STORAGE);
+ }
+
+ @Override
+ Options getOptions() {
+ return opts;
+ }
+
+ @Override
+ String getDescription() {
+ return "Convert bookie indexes from DbLedgerStorage to
InterleavedStorage format";
+ }
+
+ String getUsage() {
+ return CMD_CONVERT_TO_INTERLEAVED_STORAGE;
+ }
+
+ @Override
+ int runCmd(CommandLine cmdLine) throws Exception {
+ LOG.info("=== Converting DbLedgerStorage ===");
+ ServerConfiguration conf = new ServerConfiguration(bkConf);
+ LedgerDirsManager ledgerDirsManager = new
LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+ new DiskChecker(bkConf.getDiskUsageThreshold(),
bkConf.getDiskUsageWarnThreshold()));
+ LedgerDirsManager ledgerIndexManager = new
LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+ new DiskChecker(bkConf.getDiskUsageThreshold(),
bkConf.getDiskUsageWarnThreshold()));
+
+ DbLedgerStorage dbStorage = new DbLedgerStorage();
+ InterleavedLedgerStorage interleavedStorage = new
InterleavedLedgerStorage();
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+ @Override
+ public Checkpoint newCheckpoint() {
+ return Checkpoint.MAX;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint,
boolean compact)
+ throws IOException {
+ }
+ };
+ Checkpointer checkpointer = new Checkpointer() {
+ @Override
+ public void startCheckpoint(Checkpoint checkpoint) {
+ // No-op
+ }
+ };
+
+ dbStorage.initialize(conf, null, ledgerDirsManager,
ledgerIndexManager,
+ checkpointSource, checkpointer,
NullStatsLogger.INSTANCE);
+ interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerIndexManager,
+ checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+ LedgerCache interleavedLedgerCache =
interleavedStorage.ledgerCache;
+
+ EntryLocationIndex dbEntryLocationIndex =
dbStorage.getEntryLocationIndex();
+
+ int convertedLedgers = 0;
+ for (long ledgerId : dbStorage.getActiveLedgersInRange(0,
Long.MAX_VALUE)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Converting ledger {}", ledgerId);
+ }
+
+ interleavedStorage.setMasterKey(ledgerId,
dbStorage.readMasterKey(ledgerId));
+ if (dbStorage.isFenced(ledgerId)) {
+ interleavedStorage.setFenced(ledgerId);
+ }
+
+ long lastEntryInLedger =
dbEntryLocationIndex.getLastEntryInLedger(ledgerId);
+ for (long entryId = 0; entryId <= lastEntryInLedger;
entryId++) {
+ try {
+ long location =
dbEntryLocationIndex.getLocation(ledgerId, entryId);
+ if (location != 0L) {
+ interleavedLedgerCache.putEntryOffset(ledgerId,
entryId, location);
+ }
+ } catch (Bookie.NoEntryException e) {
+ // Ignore entry
+ }
+ }
+
+ if (++convertedLedgers % 1000 == 0) {
+ LOG.info("Converted {} ledgers", convertedLedgers);
+ }
+ }
+
+ dbStorage.shutdown();
+
+ interleavedLedgerCache.flushLedger(true);
+ interleavedStorage.flush();
+ interleavedStorage.shutdown();
+
+ String baseDir =
ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+
+ // Rename databases and keep backup
+ Files.move(FileSystems.getDefault().getPath(baseDir, "ledgers"),
+ FileSystems.getDefault().getPath(baseDir,
"ledgers.backup"));
+
+ Files.move(FileSystems.getDefault().getPath(baseDir, "locations"),
+ FileSystems.getDefault().getPath(baseDir,
"locations.backup"));
+
+ LOG.info("---- Done Converting {} ledgers ----", convertedLedgers);
+ return 0;
+ }
+ }
+
+ /**
+ * Rebuild DbLedgerStorage locations index.
+ */
+ class RebuildDbLedgerLocationsIndexCmd extends MyCommand {
+ Options opts = new Options();
+
+ public RebuildDbLedgerLocationsIndexCmd() {
+ super(CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX);
+ }
+
+ @Override
+ Options getOptions() {
+ return opts;
+ }
+
+ @Override
+ String getDescription() {
+ return "Rebuild DbLedgerStorage locations index by scanning the
entry logs";
+ }
+
+ String getUsage() {
+ return CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX;
+ }
+
+ @Override
+ int runCmd(CommandLine cmdLine) throws Exception {
+ LOG.info("=== Rebuilding bookie index ===");
+ ServerConfiguration conf = new ServerConfiguration(bkConf);
+ new LocationsIndexRebuildOp(conf).initiate();
+ LOG.info("-- Done rebuilding bookie index --");
+ return 0;
+ }
+ }
+
final Map<String, MyCommand> commands = new HashMap<String, MyCommand>();
{
commands.put(CMD_METAFORMAT, new MetaFormatCmd());
@@ -2138,6 +2381,9 @@ public int runCmd(CommandLine cmdLine) throws Exception {
commands.put(CMD_DELETELEDGER, new DeleteLedgerCmd());
commands.put(CMD_BOOKIEINFO, new BookieInfoCmd());
commands.put(CMD_DECOMMISSIONBOOKIE, new DecommissionBookieCmd());
+ commands.put(CMD_CONVERT_TO_DB_STORAGE, new ConvertToDbStorageCmd());
+ commands.put(CMD_CONVERT_TO_INTERLEAVED_STORAGE, new
ConvertToInterleavedStorageCmd());
+ commands.put(CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX, new
RebuildDbLedgerLocationsIndexCmd());
commands.put(CMD_HELP, new HelpCmd());
commands.put(CMD_LOSTBOOKIERECOVERYDELAY, new
LostBookieRecoveryDelayCmd());
commands.put(CMD_TRIGGERAUDIT, new TriggerAuditCmd());
@@ -2432,6 +2678,66 @@ protected void readLedgerIndexEntries(long ledgerId)
throws IOException {
}
}
+ /**
+ * Get an iterable over pages of entries and locations for a ledger.
+ *
+ * @param ledgerId
+ * @return
+ * @throws IOException
+ */
+ protected Iterable<SortedMap<Long, Long>> getLedgerIndexEntries(final long
ledgerId) throws IOException {
+ final FileInfo fi = getFileInfo(ledgerId);
+ final long size = fi.size();
+
+ final LedgerEntryPage lep = new LedgerEntryPage(pageSize,
entriesPerPage);
+ lep.usePage();
+
+ final Iterator<SortedMap<Long, Long>> iterator = new
Iterator<SortedMap<Long, Long>>() {
+ long curSize = 0;
+ long curEntry = 0;
+
+ @Override
+ public boolean hasNext() {
+ return curSize < size;
+ }
+
+ @Override
+ public SortedMap<Long, Long> next() {
+ SortedMap<Long, Long> entries = Maps.newTreeMap();
+ lep.setLedgerAndFirstEntry(ledgerId, curEntry);
+ try {
+ lep.readPage(fi);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ // process a page
+ for (int i = 0; i < entriesPerPage; i++) {
+ long offset = lep.getOffset(i * 8);
+ if (offset != 0) {
+ entries.put(curEntry, offset);
+ }
+ ++curEntry;
+ }
+
+ curSize += pageSize;
+ return entries;
+ }
+
+ @Override
+ public void remove() {
+ throw new RuntimeException("Cannot remove");
+ }
+
+ };
+
+ return new Iterable<SortedMap<Long, Long>>() {
+ public Iterator<SortedMap<Long, Long>> iterator() {
+ return iterator;
+ }
+ };
+ }
+
/**
* Scan over an entry log file.
*
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index fa8001677..b0257ec03 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -30,8 +30,6 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
import java.util.SortedMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
@@ -53,9 +51,11 @@
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.EntryLogger;
import org.apache.bookkeeper.bookie.GarbageCollectorThread;
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
@@ -714,8 +714,8 @@ public long getLastAddConfirmed(long ledgerId) throws
IOException {
}
@Override
- public Observable waitForLastAddConfirmedUpdate(long ledgerId, long
previoisLAC, Observer observer)
- throws IOException {
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId, long
previousLAC,
+ Watcher<LastAddConfirmedUpdateNotification> watcher) throws
IOException {
throw new UnsupportedOperationException();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
new file mode 100644
index 000000000..6cf623267
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.lang.time.DurationFormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scan all entries in the entry log and rebuild the ledgerStorageIndex.
+ */
+public class LocationsIndexRebuildOp {
+ private final ServerConfiguration conf;
+
+ public LocationsIndexRebuildOp(ServerConfiguration conf) {
+ this.conf = conf;
+ }
+
+ public void initiate() throws IOException {
+ LOG.info("Starting index rebuilding");
+
+ // Move locations index to a backup directory
+ String basePath =
Bookie.getCurrentDirectory(conf.getLedgerDirs()[0]).toString();
+ Path currentPath = FileSystems.getDefault().getPath(basePath,
"locations");
+ String timestamp = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date());
+ Path backupPath = FileSystems.getDefault().getPath(basePath,
"locations.BACKUP-" + timestamp);
+ Files.move(currentPath, backupPath);
+
+ LOG.info("Created locations index backup at {}", backupPath);
+
+ long startTime = System.nanoTime();
+
+ EntryLogger entryLogger = new EntryLogger(conf, new
LedgerDirsManager(conf, conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold())));
+ Set<Long> entryLogs = entryLogger.getEntryLogsSet();
+
+ String locationsDbPath = FileSystems.getDefault().getPath(basePath,
"locations").toFile().toString();
+
+ Set<Long> activeLedgers = getActiveLedgers(conf,
KeyValueStorageRocksDB.factory, basePath);
+ LOG.info("Found {} active ledgers in ledger manager",
activeLedgers.size());
+
+ KeyValueStorage newIndex =
KeyValueStorageRocksDB.factory.newKeyValueStorage(locationsDbPath,
DbConfigType.Huge,
+ conf);
+
+ int totalEntryLogs = entryLogs.size();
+ int completedEntryLogs = 0;
+ LOG.info("Scanning {} entry logs", totalEntryLogs);
+
+ for (long entryLogId : entryLogs) {
+ entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
+ @Override
+ public void process(long ledgerId, long offset, ByteBuf entry)
throws IOException {
+ long entryId = entry.getLong(8);
+
+ // Actual location indexed is pointing past the entry size
+ long location = (entryLogId << 32L) | (offset + 4);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Rebuilding {}:{} at location {} / {}",
ledgerId, entryId, location >> 32,
+ location & (Integer.MAX_VALUE - 1));
+ }
+
+ // Update the ledger index page
+ LongPairWrapper key = LongPairWrapper.get(ledgerId,
entryId);
+ LongWrapper value = LongWrapper.get(location);
+ newIndex.put(key.array, value.array);
+ }
+
+ @Override
+ public boolean accept(long ledgerId) {
+ return activeLedgers.contains(ledgerId);
+ }
+ });
+
+ ++completedEntryLogs;
+ LOG.info("Completed scanning of log {}.log -- {} / {}",
Long.toHexString(entryLogId), completedEntryLogs,
+ totalEntryLogs);
+ }
+
+ newIndex.sync();
+ newIndex.close();
+
+ LOG.info("Rebuilding index is done. Total time: {}",
+
DurationFormatUtils.formatDurationHMS(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
- startTime)));
+ }
+
+ private Set<Long> getActiveLedgers(ServerConfiguration conf,
KeyValueStorageFactory storageFactory, String basePath)
+ throws IOException {
+ LedgerMetadataIndex ledgers = new LedgerMetadataIndex(conf,
storageFactory, basePath, NullStatsLogger.INSTANCE);
+ Set<Long> activeLedgers = Sets.newHashSet();
+ for (Long ledger : ledgers.getActiveLedgersInRange(0, Long.MAX_VALUE))
{
+ activeLedgers.add(ledger);
+ }
+
+ ledgers.close();
+ return activeLedgers;
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LocationsIndexRebuildOp.class);
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
new file mode 100644
index 000000000..a954f28ad
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
@@ -0,0 +1,145 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Test for BookieShell convert-to-interleaved-storage command.
+ */
+@Slf4j
+public class ConversionRollbackTest {
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+ @Override
+ public Checkpoint newCheckpoint() {
+ return Checkpoint.MAX;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
throws IOException {
+ }
+ };
+
+ Checkpointer checkpointer = new Checkpointer() {
+ @Override
+ public void startCheckpoint(Checkpoint checkpoint) {
+ // No-op
+ }
+ };
+
+ @Test
+ public void convertFromDbStorageToInterleaved() throws Exception {
+ File tmpDir = File.createTempFile("bkTest", ".dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File curDir = Bookie.getCurrentDirectory(tmpDir);
+ Bookie.checkDirectoryStructure(curDir);
+
+ log.info("Using temp directory: {}", tmpDir);
+
+ ServerConfiguration conf = new ServerConfiguration();
+ conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+ conf.setAllowLoopback(true);
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf,
conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
+
+ DbLedgerStorage dbStorage = new DbLedgerStorage();
+ dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE);
+
+ // Insert some ledger & entries in the dbStorage
+ for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+ dbStorage.setMasterKey(ledgerId, ("ledger-" +
ledgerId).getBytes());
+ dbStorage.setFenced(ledgerId);
+
+ for (long entryId = 0; entryId < 10000; entryId++) {
+ ByteBuf entry = Unpooled.buffer(128);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ dbStorage.addEntry(entry);
+ }
+ }
+
+ dbStorage.flush();
+ dbStorage.shutdown();
+
+ // Run conversion tool
+ BookieShell shell = new BookieShell();
+ shell.setConf(conf);
+ int res = shell.run(new String[] { "convert-to-interleaved-storage" });
+
+ Assert.assertEquals(0, res);
+
+ // Verify that interleaved storage index has the same entries
+ InterleavedLedgerStorage interleavedStorage = new
InterleavedLedgerStorage();
+ interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE);
+
+ Set<Long> ledgers =
Sets.newTreeSet(interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+ Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L,
4L)), ledgers);
+
+ for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+ Assert.assertEquals(true, interleavedStorage.isFenced(ledgerId));
+ Assert.assertEquals("ledger-" + ledgerId, new
String(interleavedStorage.readMasterKey(ledgerId)));
+
+ for (long entryId = 0; entryId < 10000; entryId++) {
+ ByteBuf entry = Unpooled.buffer(1024);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ ByteBuf result = interleavedStorage.getEntry(ledgerId,
entryId);
+ Assert.assertEquals(entry, result);
+ }
+ }
+
+ interleavedStorage.shutdown();
+ FileUtils.forceDelete(tmpDir);
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
new file mode 100644
index 000000000..df5434cb5
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
@@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for BookieShell convert-to-db-storage command.
+ */
+public class ConversionTest {
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+ @Override
+ public Checkpoint newCheckpoint() {
+ return Checkpoint.MAX;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
throws IOException {
+ }
+ };
+
+ Checkpointer checkpointer = new Checkpointer() {
+ @Override
+ public void startCheckpoint(Checkpoint checkpoint) {
+ // No-op
+ }
+ };
+
+ @Test
+ public void test() throws Exception {
+ File tmpDir = File.createTempFile("bkTest", ".dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File curDir = Bookie.getCurrentDirectory(tmpDir);
+ Bookie.checkDirectoryStructure(curDir);
+
+ System.out.println(tmpDir);
+
+ ServerConfiguration conf = new ServerConfiguration();
+ conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+ conf.setAllowLoopback(true);
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf,
conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
+
+ InterleavedLedgerStorage interleavedStorage = new
InterleavedLedgerStorage();
+ interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE);
+
+ // Insert some ledger & entries in the interleaved storage
+ for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+ interleavedStorage.setMasterKey(ledgerId, ("ledger-" +
ledgerId).getBytes());
+ interleavedStorage.setFenced(ledgerId);
+
+ for (long entryId = 0; entryId < 10000; entryId++) {
+ ByteBuf entry = Unpooled.buffer(128);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ interleavedStorage.addEntry(entry);
+ }
+ }
+
+ interleavedStorage.flush();
+ interleavedStorage.shutdown();
+
+ // Run conversion tool
+ BookieShell shell = new BookieShell();
+ shell.setConf(conf);
+ int res = shell.run(new String[] { "convert-to-db-storage" });
+
+ Assert.assertEquals(0, res);
+
+ // Verify that db index has the same entries
+ DbLedgerStorage dbStorage = new DbLedgerStorage();
+ dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE);
+
+ interleavedStorage = new InterleavedLedgerStorage();
+ interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE);
+
+ Set<Long> ledgers =
Sets.newTreeSet(dbStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+ Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L,
4L)), ledgers);
+
+ ledgers =
Sets.newTreeSet(interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+ Assert.assertEquals(Sets.newTreeSet(), ledgers);
+
+ for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+ Assert.assertEquals(true, dbStorage.isFenced(ledgerId));
+ Assert.assertEquals("ledger-" + ledgerId, new
String(dbStorage.readMasterKey(ledgerId)));
+
+ for (long entryId = 0; entryId < 10000; entryId++) {
+ ByteBuf entry = Unpooled.buffer(1024);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ ByteBuf result = dbStorage.getEntry(ledgerId, entryId);
+ Assert.assertEquals(entry, result);
+ result.release();
+
+ try {
+ interleavedStorage.getEntry(ledgerId, entryId);
+ Assert.fail("entry should not exist");
+ } catch (NoLedgerException e) {
+ // Ok
+ }
+ }
+ }
+
+ interleavedStorage.shutdown();
+ dbStorage.shutdown();
+ FileUtils.forceDelete(tmpDir);
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
new file mode 100644
index 000000000..4793ec71a
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for class {@link LocationsIndexRebuildOp}.
+ */
+public class LocationsIndexRebuildTest {
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+ @Override
+ public Checkpoint newCheckpoint() {
+ return Checkpoint.MAX;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
throws IOException {
+ }
+ };
+
+ Checkpointer checkpointer = new Checkpointer() {
+ @Override
+ public void startCheckpoint(Checkpoint checkpoint) {
+ // No-op
+ }
+ };
+
+ @Test
+ public void test() throws Exception {
+ File tmpDir = File.createTempFile("bkTest", ".dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File curDir = Bookie.getCurrentDirectory(tmpDir);
+ Bookie.checkDirectoryStructure(curDir);
+
+ System.out.println(tmpDir);
+
+ ServerConfiguration conf = new ServerConfiguration();
+ conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+ conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ conf.setAllowLoopback(true);
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf,
conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
+
+ DbLedgerStorage ledgerStorage = new DbLedgerStorage();
+ ledgerStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE);
+
+ // Insert some ledger & entries in the storage
+ for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+ ledgerStorage.setMasterKey(ledgerId, ("ledger-" +
ledgerId).getBytes());
+ ledgerStorage.setFenced(ledgerId);
+
+ for (long entryId = 0; entryId < 100; entryId++) {
+ ByteBuf entry = Unpooled.buffer(128);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ ledgerStorage.addEntry(entry);
+ }
+ }
+
+ ledgerStorage.flush();
+ ledgerStorage.shutdown();
+
+ // Rebuild index through the tool
+ BookieShell shell = new BookieShell();
+ shell.setConf(conf);
+ int res = shell.run(new String[] { "rebuild-db-ledger-locations-index"
});
+
+ Assert.assertEquals(0, res);
+
+ // Verify that db index has the same entries
+ ledgerStorage = new DbLedgerStorage();
+ ledgerStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE);
+
+ Set<Long> ledgers =
Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+ Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L,
4L)), ledgers);
+
+ for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+ Assert.assertEquals(true, ledgerStorage.isFenced(ledgerId));
+ Assert.assertEquals("ledger-" + ledgerId, new
String(ledgerStorage.readMasterKey(ledgerId)));
+
+ ByteBuf lastEntry = ledgerStorage.getLastEntry(ledgerId);
+ assertEquals(ledgerId, lastEntry.readLong());
+ long lastEntryId = lastEntry.readLong();
+ assertEquals(99, lastEntryId);
+
+ for (long entryId = 0; entryId < 100; entryId++) {
+ ByteBuf entry = Unpooled.buffer(1024);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ ByteBuf result = ledgerStorage.getEntry(ledgerId, entryId);
+ Assert.assertEquals(entry, result);
+ }
+ }
+
+ ledgerStorage.shutdown();
+ FileUtils.forceDelete(tmpDir);
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services