This is an automated email from the ASF dual-hosted git repository.
hsaputra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 0b93249 ISSUE #2773: Add db ledgers index rebuild op
0b93249 is described below
commit 0b93249d25419de9714f22c396d371940be448f9
Author: Jack Vanlightly <[email protected]>
AuthorDate: Fri Sep 24 02:47:07 2021 +0200
ISSUE #2773: Add db ledgers index rebuild op
### Motivation
File corruption of the ledgers index (DbLedgerStorage) has happened in
production and we currently have no good way of resolving it. There exists a
locations index rebuild command, this issue describes a new command targeted at
the ledgers index.
This command should only be run when either the bookie is offline, or when
in readonly mode to avoid data loss. Container based environments may not be
able to make the bookie go offline while also allowing the shell or bkctl to
perform the operation.
The db ledgers index rebuild op does the following:
- scans all journal and entry log files to discover all ledger ids
currently stored.
- builds a new index where each ledger is fenced and has an empty master
key set.
- If all went well, replaces the original index with the new one
The bookie should then be restarted again (in normal mode) to load the
rebuilt index.
Notable stuff:
The reason for setting fencing each ledger is that there is no safe way of
setting the fenced status based on metadata while the bookie is running. If the
command is run when the bookie is in readonly mode, it can still serve fence
requests and any fencing that occurs while the rebuild is occurring will be
lost, which breaks the data safety guarantees of the BookKeeper protocol. Given
also that the bookie should at least be in readonly mode (else offline), it is
likely a member of the [...]
The reason for setting an empty master key is that firstly, an empty master
key simply gets overwritten, so cannot cause an IllegalOpException. Secondly,
if we use the password stored in metadata, then we need to be sure to use
exactly the same digest algorithm as the client when it creates the ledgerKey,
else the bookie will start failing all writes. This could potentially cause a
problem in the future if the way the ledgerKey is generated changes (old
clients would be incompatible a [...]
### Changes
Adds two new commands. "rebuild-db-ledgers-index" rebuilds the db ledgers
index by scanning the journal and entry log files. "check-db-ledgers-index"
performs a read scan of the db ledgers index, this can be used when corruption
is suspected.
Additionally:
- a test of the new rebuild op
- makes the Journal. listJournalIds() method public so that the command can
iterate over the journal files.
- rewords some printed statements in the LocationsIndexRebuildOp to make it
clear which index it is rebuilding (given that there are two indexes in the
DbLedgerStorage).
Reviewers: Enrico Olivelli <[email protected]>
This closes #2774 from Vanlightly/rebuild-ledgers-index-op, closes #2773
and squashes the following commits:
93bcf2e78 [Jack Vanlightly] Fix checkstyle
5d679c8ef [Jack Vanlightly] Improved logging of ledgers index check and
rebuild ops
a9789870f [Jack Vanlightly] Add db ledgers index rebuild op
---
.../org/apache/bookkeeper/bookie/BookieShell.java | 87 ++++++++
.../java/org/apache/bookkeeper/bookie/Journal.java | 2 +-
.../bookie/storage/ldb/LedgersIndexCheckOp.java | 103 ++++++++++
.../bookie/storage/ldb/LedgersIndexRebuildOp.java | 218 +++++++++++++++++++++
.../storage/ldb/LocationsIndexRebuildOp.java | 4 +-
.../bookie/CheckDBLedgersIndexCommand.java | 81 ++++++++
.../RebuildDBLedgerLocationsIndexCommand.java | 4 +-
.../bookie/RebuildDBLedgersIndexCommand.java | 76 +++++++
.../storage/ldb/LedgersIndexRebuildTest.java | 173 ++++++++++++++++
.../tools/cli/commands/BookieCommandGroup.java | 4 +
10 files changed, 747 insertions(+), 5 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 461623e..8d00220 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -51,6 +51,7 @@ import
org.apache.bookkeeper.tools.cli.commands.autorecovery.QueryAutoRecoverySt
import org.apache.bookkeeper.tools.cli.commands.autorecovery.ToggleCommand;
import
org.apache.bookkeeper.tools.cli.commands.autorecovery.TriggerAuditCommand;
import
org.apache.bookkeeper.tools.cli.commands.autorecovery.WhoIsAuditorCommand;
+import
org.apache.bookkeeper.tools.cli.commands.bookie.CheckDBLedgersIndexCommand;
import
org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToDBStorageCommand;
import
org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToInterleavedStorageCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.FlipBookieIdCommand;
@@ -67,6 +68,7 @@ import
org.apache.bookkeeper.tools.cli.commands.bookie.ReadLedgerCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.ReadLogCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.ReadLogMetadataCommand;
import
org.apache.bookkeeper.tools.cli.commands.bookie.RebuildDBLedgerLocationsIndexCommand;
+import
org.apache.bookkeeper.tools.cli.commands.bookie.RebuildDBLedgersIndexCommand;
import
org.apache.bookkeeper.tools.cli.commands.bookie.RegenerateInterleavedStorageIndexFileCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.SanityTestCommand;
import
org.apache.bookkeeper.tools.cli.commands.bookie.UpdateBookieInLedgerCommand;
@@ -155,6 +157,8 @@ public class BookieShell implements Tool {
static final String CMD_CONVERT_TO_DB_STORAGE = "convert-to-db-storage";
static final String CMD_CONVERT_TO_INTERLEAVED_STORAGE =
"convert-to-interleaved-storage";
static final String CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX =
"rebuild-db-ledger-locations-index";
+ static final String CMD_REBUILD_DB_LEDGERS_INDEX =
"rebuild-db-ledgers-index";
+ static final String CMD_CHECK_DB_LEDGERS_INDEX = "check-db-ledgers-index";
static final String CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE =
"regenerate-interleaved-storage-index-file";
static final String CMD_QUERY_AUTORECOVERY_STATUS = "queryrecoverystatus";
@@ -2108,6 +2112,87 @@ public class BookieShell implements Tool {
}
/**
+ * Rebuild DbLedgerStorage ledgers index.
+ */
+ class RebuildDbLedgersIndexCmd extends MyCommand {
+ Options opts = new Options();
+
+ public RebuildDbLedgersIndexCmd() {
+ super(CMD_REBUILD_DB_LEDGERS_INDEX);
+ }
+
+ @Override
+ Options getOptions() {
+ opts.addOption("v", "verbose", false, "Verbose logging, print the
ledgers added to the new index");
+ return opts;
+ }
+
+ @Override
+ String getDescription() {
+ return "Rebuild DbLedgerStorage ledgers index by scanning "
+ + "the journal and entry logs (sets all ledgers to fenced)";
+ }
+
+ @Override
+ String getUsage() {
+ return CMD_REBUILD_DB_LEDGERS_INDEX;
+ }
+
+ @Override
+ int runCmd(CommandLine cmdLine) throws Exception {
+ RebuildDBLedgersIndexCommand.RebuildLedgersIndexFlags flags =
+ new
RebuildDBLedgersIndexCommand.RebuildLedgersIndexFlags();
+ flags.verbose(cmdLine.hasOption("v"));
+ RebuildDBLedgersIndexCommand cmd = new
RebuildDBLedgersIndexCommand();
+ if (cmd.apply(bkConf, flags)) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+ }
+
+ /**
+ * Rebuild DbLedgerStorage ledgers index.
+ */
+ class CheckDbLedgersIndexCmd extends MyCommand {
+ Options opts = new Options();
+
+ public CheckDbLedgersIndexCmd() {
+ super(CMD_CHECK_DB_LEDGERS_INDEX);
+ }
+
+ @Override
+ Options getOptions() {
+ opts.addOption("v", "verbose", false, "Verbose logging, print the
ledger data in the index.");
+ return opts;
+ }
+
+ @Override
+ String getDescription() {
+ return "Check DbLedgerStorage ledgers index by performing a read
scan";
+ }
+
+ @Override
+ String getUsage() {
+ return CMD_CHECK_DB_LEDGERS_INDEX;
+ }
+
+ @Override
+ int runCmd(CommandLine cmdLine) throws Exception {
+ CheckDBLedgersIndexCommand.CheckLedgersIndexFlags flags =
+ new CheckDBLedgersIndexCommand.CheckLedgersIndexFlags();
+ flags.verbose(cmdLine.hasOption("v"));
+ CheckDBLedgersIndexCommand cmd = new CheckDBLedgersIndexCommand();
+ if (cmd.apply(bkConf, flags)) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+ }
+
+ /**
* Regenerate an index file for interleaved storage.
*/
class RegenerateInterleavedStorageIndexFile extends MyCommand {
@@ -2205,6 +2290,8 @@ public class BookieShell implements Tool {
commands.put(CMD_CONVERT_TO_DB_STORAGE, new ConvertToDbStorageCmd());
commands.put(CMD_CONVERT_TO_INTERLEAVED_STORAGE, new
ConvertToInterleavedStorageCmd());
commands.put(CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX, new
RebuildDbLedgerLocationsIndexCmd());
+ commands.put(CMD_REBUILD_DB_LEDGERS_INDEX, new
RebuildDbLedgersIndexCmd());
+ commands.put(CMD_CHECK_DB_LEDGERS_INDEX, new CheckDbLedgersIndexCmd());
commands.put(CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE, new
RegenerateInterleavedStorageIndexFile());
commands.put(CMD_HELP, new HelpCmd());
commands.put(CMD_LOSTBOOKIERECOVERYDELAY, new
LostBookieRecoveryDelayCmd());
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 489974c..d225bff 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -103,7 +103,7 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
* @param filter journal id filter
* @return list of filtered ids
*/
- static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {
+ public static List<Long> listJournalIds(File journalDir, JournalIdFilter
filter) {
File[] logFiles = journalDir.listFiles();
if (logFiles == null || logFiles.length == 0) {
return Collections.emptyList();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexCheckOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexCheckOp.java
new file mode 100644
index 0000000..a48de34
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexCheckOp.java
@@ -0,0 +1,103 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.bookie.BookieImpl;
+import
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.commons.lang.time.DurationFormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scan the ledgers index to make sure it is readable.
+ */
+public class LedgersIndexCheckOp {
+ private static final Logger LOG =
LoggerFactory.getLogger(LedgersIndexCheckOp.class);
+
+ private final ServerConfiguration conf;
+ private final boolean verbose;
+ private static final String LedgersSubPath = "ledgers";
+
+ public LedgersIndexCheckOp(ServerConfiguration conf, boolean verbose) {
+ this.conf = conf;
+ this.verbose = verbose;
+ }
+
+ public boolean initiate() throws IOException {
+ String basePath =
BookieImpl.getCurrentDirectory(conf.getLedgerDirs()[0]).toString();
+ Path currentPath = FileSystems.getDefault().getPath(basePath,
LedgersSubPath);
+
+ LOG.info("Loading ledgers index from {}", currentPath);
+
+ long startTime = System.nanoTime();
+ LOG.info("Starting index scan");
+
+ try {
+ KeyValueStorage index = new KeyValueStorageRocksDB(basePath,
LedgersSubPath,
+ DbConfigType.Small, conf, true);
+ // Read all ledgers from db
+ KeyValueStorage.CloseableIterator<Map.Entry<byte[], byte[]>>
iterator = index.iterator();
+ int ctr = 0;
+ try {
+ while (iterator.hasNext()) {
+ ctr++;
+ Map.Entry<byte[], byte[]> entry = iterator.next();
+ long ledgerId = ArrayUtil.getLong(entry.getKey(), 0);
+ DbLedgerStorageDataFormats.LedgerData ledgerData =
+
DbLedgerStorageDataFormats.LedgerData.parseFrom(entry.getValue());
+ if (verbose) {
+ LOG.info("Scanned: {}, ledger: {}, exists: {},
isFenced: {}, masterKey: {}, explicitLAC: {}",
+ ctr,
+ ledgerId,
+ (ledgerData.hasExists() ?
ledgerData.getExists() : "-"),
+ (ledgerData.hasFenced() ?
ledgerData.getFenced() : "-"),
+ (ledgerData.hasMasterKey()
+ ? Base64.getEncoder()
+
.encodeToString(ledgerData.getMasterKey().toByteArray())
+ : "-"),
+ (ledgerData.hasExplicitLac() ?
ledgerData.getExplicitLac() : "-"));
+ } else if (ctr % 100 == 0) {
+ LOG.info("Scanned {} ledgers", ctr);
+ }
+ }
+ } finally {
+ iterator.close();
+ }
+
+ LOG.info("Scanned {} ledgers", ctr);
+ LOG.info("Index scan has completed successfully. Total time: {}",
+ DurationFormatUtils.formatDurationHMS(
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startTime)));
+ } catch (Throwable t) {
+ LOG.error("Index scan has failed with error", t);
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
new file mode 100644
index 0000000..5e44b76
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
@@ -0,0 +1,218 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.Journal;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.util.DiskChecker;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scan all entries in the journal and entry log files then rebuilds the
ledgers index.
+ * Notable stuff:
+ * - Fences every ledger as even if we check the metadata, we cannot guarantee
that
+ * a fence request was served while the rebuild was taking place (even if
the bookie
+ * is running in read-only mode).
+ * Losing the fenced status of a ledger is UNSAFE.
+ * - Sets the master key as an empty byte array. This is correct as empty
master keys
+ * are overwritten and we cannot use the password from metadata, and cannot
know 100%
+ * for sure how a digest for the password was generated.
+ */
+public class LedgersIndexRebuildOp {
+ private static final Logger LOG =
LoggerFactory.getLogger(LedgersIndexRebuildOp.class);
+
+ private final ServerConfiguration conf;
+ private final boolean verbose;
+ private static final String LedgersSubPath = "ledgers";
+
+ public LedgersIndexRebuildOp(ServerConfiguration conf, boolean verbose) {
+ this.conf = conf;
+ this.verbose = verbose;
+ }
+
+ public boolean initiate() {
+ LOG.info("Starting ledger index rebuilding");
+
+ String timestamp = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date());
+ String basePath =
BookieImpl.getCurrentDirectory(conf.getLedgerDirs()[0]).toString();
+ String tempLedgersSubPath = LedgersSubPath + ".TEMP-" + timestamp;
+ Path tempPath = FileSystems.getDefault().getPath(basePath,
tempLedgersSubPath);
+ Path currentPath = FileSystems.getDefault().getPath(basePath,
LedgersSubPath);
+
+ LOG.info("Starting scan phase (scans journal and entry log files)");
+
+ try {
+ Set<Long> ledgers = new HashSet<>();
+ scanJournals(ledgers);
+ scanEntryLogFiles(ledgers);
+
+ LOG.info("Scan complete, found {} ledgers. "
+ + "Starting to build a new ledgers index", ledgers.size());
+
+ try (KeyValueStorage newIndex =
KeyValueStorageRocksDB.factory.newKeyValueStorage(
+ basePath, tempLedgersSubPath, DbConfigType.Small, conf)) {
+ LOG.info("Created ledgers index at temp location {}",
tempPath);
+
+ for (Long ledgerId : ledgers) {
+ DbLedgerStorageDataFormats.LedgerData ledgerData =
+ DbLedgerStorageDataFormats.LedgerData.newBuilder()
+ .setExists(true)
+ .setFenced(true)
+ .setMasterKey(ByteString.EMPTY).build();
+
+ byte[] ledgerArray = new byte[16];
+ ArrayUtil.setLong(ledgerArray, 0, ledgerId);
+ newIndex.put(ledgerArray, ledgerData.toByteArray());
+ }
+
+ newIndex.sync();
+ }
+ } catch (Throwable t) {
+ LOG.error("Error during rebuild, the original index remains
unchanged", t);
+ delete(tempPath);
+ return false;
+ }
+
+ // replace the existing index
+ try {
+ Path prevPath = FileSystems.getDefault().getPath(basePath,
LedgersSubPath + ".PREV-" + timestamp);
+ LOG.info("Moving original index from original location: {} up to
back-up location: {}",
+ currentPath, prevPath);
+ Files.move(currentPath, prevPath);
+ LOG.info("Moving rebuilt index from: {} to: {}", tempPath,
currentPath);
+ Files.move(tempPath, currentPath);
+ LOG.info("Original index has been replaced with the new index. "
+ + "The original index has been moved to {}", prevPath);
+ } catch (IOException e) {
+ LOG.error("Could not replace original index with rebuilt index. "
+ + "To return to the original state, ensure the original
index is in its original location", e);
+ return false;
+ }
+
+ return true;
+ }
+
+ private void scanEntryLogFiles(Set<Long> ledgers) throws IOException {
+ EntryLogger entryLogger = new EntryLogger(conf, new
LedgerDirsManager(conf, conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold())));
+ Set<Long> entryLogs = entryLogger.getEntryLogsSet();
+
+ int totalEntryLogs = entryLogs.size();
+ int completedEntryLogs = 0;
+ LOG.info("Scanning {} entry logs", totalEntryLogs);
+
+ for (long entryLogId : entryLogs) {
+ entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
+ @Override
+ public void process(long ledgerId, long offset, ByteBuf entry)
throws IOException {
+ if (ledgers.add(ledgerId)) {
+ if (verbose) {
+ LOG.info("Found ledger {} in entry log", ledgerId);
+ }
+ }
+ }
+
+ @Override
+ public boolean accept(long ledgerId) {
+ return true;
+ }
+ });
+
+ ++completedEntryLogs;
+ LOG.info("Completed scanning of log {}.log -- {} / {}",
Long.toHexString(entryLogId), completedEntryLogs,
+ totalEntryLogs);
+ }
+ }
+
+ private void scanJournals(Set<Long> ledgers) throws IOException {
+ for (Journal journal : getJournals(conf)) {
+ List<Long> journalIds =
Journal.listJournalIds(journal.getJournalDirectory(),
+ new Journal.JournalIdFilter() {
+ @Override
+ public boolean accept(long journalId) {
+ return true;
+ }
+ });
+
+ for (Long journalId : journalIds) {
+ scanJournal(journal, journalId, ledgers);
+ }
+ }
+ }
+
+ private List<Journal> getJournals(ServerConfiguration conf) {
+ List<Journal> journals =
Lists.newArrayListWithCapacity(conf.getJournalDirs().length);
+ int idx = 0;
+ for (File journalDir : conf.getJournalDirs()) {
+ journals.add(new Journal(idx++, new File(journalDir,
BookKeeperConstants.CURRENT_DIR), conf,
+ new LedgerDirsManager(conf, conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()))));
+ }
+
+ return journals;
+ }
+
+ private void scanJournal(Journal journal, long journalId, Set<Long>
ledgers) throws IOException {
+ LOG.info("Scanning journal " + journalId + " (" +
Long.toHexString(journalId) + ".txn)");
+ journal.scanJournal(journalId, 0L, new Journal.JournalScanner() {
+ @Override
+ public void process(int journalVersion, long offset, ByteBuffer
entry) {
+ ByteBuf buf = Unpooled.wrappedBuffer(entry);
+ long ledgerId = buf.readLong();
+
+ if (ledgers.add(ledgerId) && verbose) {
+ LOG.info("Found ledger {} in journal", ledgerId);
+ }
+ }
+ });
+ }
+
+ private void delete(Path path) {
+ try {
+ Files.delete(path);
+ } catch (IOException e) {
+ LOG.warn("Unable to delete {}", path.toAbsolutePath(), e);
+ }
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
index 6c9ca2c..7eae6da 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
@@ -46,7 +46,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Scan all entries in the entry log and rebuild the ledgerStorageIndex.
+ * Scan all entries in the entry log and rebuild the locations index.
*/
public class LocationsIndexRebuildOp {
private final ServerConfiguration conf;
@@ -56,7 +56,7 @@ public class LocationsIndexRebuildOp {
}
public void initiate() throws IOException {
- LOG.info("Starting index rebuilding");
+ LOG.info("Starting locations index rebuilding");
// Move locations index to a backup directory
String basePath =
BookieImpl.getCurrentDirectory(conf.getLedgerDirs()[0]).toString();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/CheckDBLedgersIndexCommand.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/CheckDBLedgersIndexCommand.java
new file mode 100644
index 0000000..fe844e5
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/CheckDBLedgersIndexCommand.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookie;
+
+import com.beust.jcommander.Parameter;
+import java.io.IOException;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import org.apache.bookkeeper.bookie.storage.ldb.LedgersIndexCheckOp;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Command to check the DBLedgerStorage ledgers index integrity.
+ */
+public class CheckDBLedgersIndexCommand extends
BookieCommand<CheckDBLedgersIndexCommand.CheckLedgersIndexFlags> {
+
+ static final Logger LOG =
LoggerFactory.getLogger(CheckDBLedgersIndexCommand.class);
+
+ private static final String NAME = "check-db-ledgers-index";
+ private static final String DESC = "Check the DBLedgerStorage ledgers
index integrity by performing a read scan";
+
+ public CheckDBLedgersIndexCommand() {
+ this(new CheckLedgersIndexFlags());
+ }
+
+ public CheckDBLedgersIndexCommand(CheckLedgersIndexFlags flags) {
+ super(CliSpec.<CheckLedgersIndexFlags>newBuilder().withName(NAME)
+ .withDescription(DESC).withFlags(flags).build());
+ }
+
+ @Override
+ public boolean apply(ServerConfiguration conf, CheckLedgersIndexFlags
cmdFlags) {
+ LOG.info("=== Checking DBStorage ledgers index by running a read scan
===");
+ ServerConfiguration serverConfiguration = new
ServerConfiguration(conf);
+ try {
+ boolean success = new LedgersIndexCheckOp(serverConfiguration,
cmdFlags.verbose).initiate();
+ if (success) {
+ LOG.info("-- Done checking DBStorage ledgers index --");
+ } else {
+ LOG.info("-- Aborted checking DBStorage ledgers index --");
+ }
+
+ return success;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return false;
+ }
+
+ }
+
+ /**
+ * Flags for read log command.
+ */
+ @Accessors(fluent = true)
+ @Setter
+ public static class CheckLedgersIndexFlags extends CliFlags {
+ @Parameter(names = { "-v", "--verbose" }, description = "Verbose
logging. Print each ledger.")
+ private boolean verbose;
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/RebuildDBLedgerLocationsIndexCommand.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/RebuildDBLedgerLocationsIndexCommand.java
index fda5945..8cbb986 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/RebuildDBLedgerLocationsIndexCommand.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/RebuildDBLedgerLocationsIndexCommand.java
@@ -43,14 +43,14 @@ public class RebuildDBLedgerLocationsIndexCommand extends
BookieCommand<CliFlags
@Override
public boolean apply(ServerConfiguration conf, CliFlags cmdFlags) {
- LOG.info("=== Rebuilding bookie index ===");
+ LOG.info("=== Rebuilding DBStorage locations index ===");
ServerConfiguration serverConfiguration = new
ServerConfiguration(conf);
try {
new LocationsIndexRebuildOp(serverConfiguration).initiate();
} catch (IOException e) {
e.printStackTrace();
}
- LOG.info("-- Done rebuilding bookie index --");
+ LOG.info("-- Done rebuilding DBStorage locations index --");
return true;
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/RebuildDBLedgersIndexCommand.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/RebuildDBLedgersIndexCommand.java
new file mode 100644
index 0000000..b37f968
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/RebuildDBLedgersIndexCommand.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookie;
+
+import com.beust.jcommander.Parameter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import org.apache.bookkeeper.bookie.storage.ldb.LedgersIndexRebuildOp;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Command to rebuild DBLedgerStorage ledgers index.
+ */
+public class RebuildDBLedgersIndexCommand extends
BookieCommand<RebuildDBLedgersIndexCommand.RebuildLedgersIndexFlags> {
+
+ static final Logger LOG =
LoggerFactory.getLogger(RebuildDBLedgersIndexCommand.class);
+
+ private static final String NAME = "rebuild-db-ledgers-index";
+ private static final String DESC = "Rebuild DBLedgerStorage ledgers index
by scanning the journal"
+ + " and entry logs (sets all ledgers to fenced)";
+
+ public RebuildDBLedgersIndexCommand() {
+ this(new RebuildLedgersIndexFlags());
+ }
+
+ public RebuildDBLedgersIndexCommand(RebuildLedgersIndexFlags flags) {
+
super(CliSpec.<RebuildDBLedgersIndexCommand.RebuildLedgersIndexFlags>newBuilder().withName(NAME)
+ .withDescription(DESC).withFlags(flags).build());
+ }
+
+ @Override
+ public boolean apply(ServerConfiguration conf, RebuildLedgersIndexFlags
cmdFlags) {
+ LOG.info("=== Rebuilding DBStorage ledgers index ===");
+ ServerConfiguration serverConfiguration = new
ServerConfiguration(conf);
+ boolean success = new LedgersIndexRebuildOp(serverConfiguration,
cmdFlags.verbose).initiate();
+ if (success) {
+ LOG.info("-- Done rebuilding DBStorage ledgers index --");
+ } else {
+ LOG.info("-- Aborted rebuilding DBStorage ledgers index --");
+ }
+
+ return success;
+ }
+
+ /**
+ * Flags for read log command.
+ */
+ @Accessors(fluent = true)
+ @Setter
+ public static class RebuildLedgersIndexFlags extends CliFlags {
+ @Parameter(names = { "-v", "--verbose" },
+ description = "Verbose logging. Print each ledger id found and
added to the rebuilt index")
+ private boolean verbose;
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildTest.java
new file mode 100644
index 0000000..e178d71
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildTest.java
@@ -0,0 +1,173 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test for class {@link LedgersIndexRebuildOp}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ LedgersIndexRebuildTest.class, MetadataDrivers.class })
+public class LedgersIndexRebuildTest {
+
+ private final BookieId bookieAddress =
BookieId.parse(UUID.randomUUID().toString());
+ private ServerConfiguration conf;
+ private File tmpDir;
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+ @Override
+ public Checkpoint newCheckpoint() {
+ return Checkpoint.MAX;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
throws IOException {
+ }
+ };
+
+ Checkpointer checkpointer = new Checkpointer() {
+ @Override
+ public void startCheckpoint(Checkpoint checkpoint) {
+ // No-op
+ }
+
+ @Override
+ public void start() {
+ // no-op
+ }
+ };
+
+ @Before
+ public void setUp() throws IOException {
+ tmpDir = File.createTempFile("bkTest", ".dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File curDir = BookieImpl.getCurrentDirectory(tmpDir);
+ BookieImpl.checkDirectoryStructure(curDir);
+
+ System.out.println(tmpDir);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ FileUtils.forceDelete(tmpDir);
+ }
+
+ @Test
+ public void testRebuildIncludesAllLedgersAndSetToFenced() throws Exception
{
+ byte[] masterKey = "12345".getBytes();
+ long ledgerCount = 100;
+
+ // no attempts to get ledger metadata fail
+ DbLedgerStorage ledgerStorage = setupLedgerStorage();
+
+ // Insert some ledger & entries in the storage
+ for (long ledgerId = 0; ledgerId < ledgerCount; ledgerId++) {
+ ledgerStorage.setMasterKey(ledgerId, masterKey);
+
+ for (long entryId = 0; entryId < 2; entryId++) {
+ ByteBuf entry = Unpooled.buffer(128);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ ledgerStorage.addEntry(entry);
+ }
+ }
+
+ ledgerStorage.flush();
+ ledgerStorage.shutdown();
+
+ // Rebuild index through the tool
+ BookieShell shell = new BookieShell();
+ shell.setConf(conf);
+ int res = shell.run(new String[] { "rebuild-db-ledgers-index", "-v" });
+
+ Assert.assertEquals(0, res);
+
+ // Verify that the ledgers index has the ledgers and that they are
fenced
+ ledgerStorage = new DbLedgerStorage();
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf,
conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
+ ledgerStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, null, checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
+
+ for (long ledgerId = 0; ledgerId < ledgerCount; ledgerId++) {
+ assertTrue(ledgerStorage.ledgerExists(ledgerId));
+ assertTrue(ledgerStorage.isFenced(ledgerId));
+ }
+
+ ledgerStorage.shutdown();
+ }
+
+ private DbLedgerStorage setupLedgerStorage() throws Exception {
+ conf = TestBKConfiguration.newServerConfiguration();
+ conf.setBookieId(bookieAddress.getId());
+ conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+ conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf,
conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
+
+
PowerMockito.whenNew(ServerConfiguration.class).withNoArguments().thenReturn(conf);
+
+
PowerMockito.whenNew(BookieId.class).withParameterTypes(String.class).withArguments(anyString())
+ .thenReturn(bookieAddress);
+
+ DbLedgerStorage ledgerStorage = new DbLedgerStorage();
+ ledgerStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, null, checkpointSource, checkpointer,
+ NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
+
+ return ledgerStorage;
+ }
+}
diff --git
a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
index 54e9f7c..1f68e69 100644
---
a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
+++
b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.tools.cli.commands;
import static
org.apache.bookkeeper.tools.common.BKCommandCategories.CATEGORY_INFRA_SERVICE;
import org.apache.bookkeeper.tools.cli.BKCtl;
+import
org.apache.bookkeeper.tools.cli.commands.bookie.CheckDBLedgersIndexCommand;
import
org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToDBStorageCommand;
import
org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToInterleavedStorageCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.FlipBookieIdCommand;
@@ -36,6 +37,7 @@ import
org.apache.bookkeeper.tools.cli.commands.bookie.ReadLedgerCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.ReadLogCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.ReadLogMetadataCommand;
import
org.apache.bookkeeper.tools.cli.commands.bookie.RebuildDBLedgerLocationsIndexCommand;
+import
org.apache.bookkeeper.tools.cli.commands.bookie.RebuildDBLedgersIndexCommand;
import
org.apache.bookkeeper.tools.cli.commands.bookie.RegenerateInterleavedStorageIndexFileCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.SanityTestCommand;
import org.apache.bookkeeper.tools.cli.commands.bookies.EndpointInfoCommand;
@@ -68,6 +70,8 @@ public class BookieCommandGroup extends
CliCommandGroup<BKFlags> {
.addCommand(new ConvertToInterleavedStorageCommand())
.addCommand(new ReadJournalCommand())
.addCommand(new RebuildDBLedgerLocationsIndexCommand())
+ .addCommand(new RebuildDBLedgersIndexCommand())
+ .addCommand(new CheckDBLedgersIndexCommand())
.addCommand(new ReadLedgerCommand())
.addCommand(new ReadLogCommand())
.addCommand(new ReadLogMetadataCommand())