This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 3188933 Utility to rebuild interleaved storage index files
3188933 is described below
commit 318893399b8040a7d291b8094f0d218a3b4e4f1e
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Sep 6 21:40:10 2018 -0700
Utility to rebuild interleaved storage index files
We came across a case where the a ledger had been deleted from
zookeeper accidently. It was possible to recover the ledger metadata
from the zookeeper journal and old snapshots, but the bookies had
deleted the indices by this time. However, even if the index is
deleted, the data still exists in the entrylog.
This utility scans the entrylog to rebuild the index, thereby making
the ledger available again.
Author: Ivan Kelly <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #1642 from ivankelly/regen-from-entrylogger
---
.../java/org/apache/bookkeeper/bookie/Bookie.java | 35 ++--
.../org/apache/bookkeeper/bookie/BookieShell.java | 69 ++++++-
.../InterleavedStorageRegenerateIndexOp.java | 230 +++++++++++++++++++++
.../org/apache/bookkeeper/client/LedgerHandle.java | 14 +-
.../bookkeeper/proto/checksum/DigestManager.java | 5 +
.../proto/checksum/MacDigestManager.java | 9 +
conf/log4j.shell.properties | 1 +
.../integration/utils/BookKeeperClusterUtils.java | 19 +-
.../tests/integration/BookieShellTestBase.java | 4 +-
.../tests/integration/TestBookieShellCluster.java | 99 +++++++++
10 files changed, 450 insertions(+), 35 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index e9e4d15..3795b37 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -629,17 +629,10 @@ public class Bookie extends BookieCriticalThread {
for (File journalDirectory : conf.getJournalDirs()) {
this.journalDirectories.add(getCurrentDirectory(journalDirectory));
}
- DiskChecker diskChecker = new
DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold());
- this.ledgerDirsManager = new LedgerDirsManager(conf,
conf.getLedgerDirs(), diskChecker,
- statsLogger.scope(LD_LEDGER_SCOPE));
-
- File[] idxDirs = conf.getIndexDirs();
- if (null == idxDirs) {
- this.indexDirsManager = this.ledgerDirsManager;
- } else {
- this.indexDirsManager = new LedgerDirsManager(conf, idxDirs,
diskChecker,
- statsLogger.scope(LD_INDEX_SCOPE));
- }
+ DiskChecker diskChecker = createDiskChecker(conf);
+ this.ledgerDirsManager = createLedgerDirsManager(conf, diskChecker,
statsLogger.scope(LD_LEDGER_SCOPE));
+ this.indexDirsManager = createIndexDirsManager(conf, diskChecker,
statsLogger.scope(LD_INDEX_SCOPE),
+ this.ledgerDirsManager);
// instantiate zookeeper client to initialize ledger manager
this.metadataDriver = instantiateMetadataDriver(conf);
@@ -675,7 +668,7 @@ public class Bookie extends BookieCriticalThread {
}
}
- if (null == idxDirs) {
+ if (ledgerDirsManager == indexDirsManager) {
this.idxMonitor = this.ledgerMonitor;
} else {
this.idxMonitor = new LedgerDirsMonitor(conf, diskChecker,
indexDirsManager);
@@ -1546,4 +1539,22 @@ public class Bookie extends BookieCriticalThread {
return exitCode;
}
+ static DiskChecker createDiskChecker(ServerConfiguration conf) {
+ return new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold());
+ }
+
+ static LedgerDirsManager createLedgerDirsManager(ServerConfiguration conf,
DiskChecker diskChecker,
+ StatsLogger statsLogger) {
+ return new LedgerDirsManager(conf, conf.getLedgerDirs(), diskChecker,
statsLogger);
+ }
+
+ static LedgerDirsManager createIndexDirsManager(ServerConfiguration conf,
DiskChecker diskChecker,
+ StatsLogger statsLogger,
LedgerDirsManager fallback) {
+ File[] idxDirs = conf.getIndexDirs();
+ if (null == idxDirs) {
+ return fallback;
+ } else {
+ return new LedgerDirsManager(conf, idxDirs, diskChecker,
statsLogger);
+ }
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 4434cfc..8eebce5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -18,7 +18,7 @@
package org.apache.bookkeeper.bookie;
-import static com.google.common.base.Charsets.UTF_8;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
import static
org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithMetadataBookieDriver;
import static
org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithRegistrationManager;
@@ -51,6 +51,7 @@ import java.nio.file.attribute.FileTime;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -72,6 +73,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
@@ -187,6 +189,7 @@ public class BookieShell implements Tool {
static final String CMD_CONVERT_TO_DB_STORAGE = "convert-to-db-storage";
static final String CMD_CONVERT_TO_INTERLEAVED_STORAGE =
"convert-to-interleaved-storage";
static final String CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX =
"rebuild-db-ledger-locations-index";
+ static final String CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE =
"regenerate-interleaved-storage-index-file";
static final String CMD_HELP = "help";
final ServerConfiguration bkConf = new ServerConfiguration();
@@ -2816,6 +2819,69 @@ public class BookieShell implements Tool {
}
}
+ /**
+ * Regenerate an index file for interleaved storage.
+ */
+ class RegenerateInterleavedStorageIndexFile extends MyCommand {
+ Options opts = new Options();
+
+ public RegenerateInterleavedStorageIndexFile() {
+ super(CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE);
+ Option ledgerOption = new Option("l", "ledgerIds", true,
+ "Ledger(s) whose index needs to
be regenerated."
+ + " Multiple can be specified,
comma separated.");
+ ledgerOption.setRequired(true);
+ ledgerOption.setValueSeparator(',');
+ ledgerOption.setArgs(Option.UNLIMITED_VALUES);
+
+ opts.addOption(ledgerOption);
+ opts.addOption("dryRun", false,
+ "Process the entryLogger, but don't write
anything.");
+ opts.addOption("password", true,
+ "The bookie stores the password in the index file,
so we need it to regenerate. "
+ + "This must match the value in the ledger
metadata.");
+ opts.addOption("b64password", true,
+ "The password in base64 encoding, for cases where
the password is not UTF-8.");
+ }
+
+ @Override
+ Options getOptions() {
+ return opts;
+ }
+
+ @Override
+ String getDescription() {
+ return "Regenerate an interleaved storage index file, from
available entrylogger files.";
+ }
+
+ @Override
+ String getUsage() {
+ return CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE;
+ }
+
+ @Override
+ int runCmd(CommandLine cmdLine) throws Exception {
+ byte[] password;
+ if (cmdLine.hasOption("password")) {
+ password = cmdLine.getOptionValue("password").getBytes(UTF_8);
+ } else if (cmdLine.hasOption("b64password")) {
+ password =
Base64.getDecoder().decode(cmdLine.getOptionValue("b64password"));
+ } else {
+ LOG.error("The password must be specified to regenerate the
index file.");
+ return 1;
+ }
+ Set<Long> ledgerIds =
Arrays.stream(cmdLine.getOptionValues("ledgerIds"))
+ .map((id) -> Long.parseLong(id)).collect(Collectors.toSet());
+ boolean dryRun = cmdLine.hasOption("dryRun");
+
+ LOG.info("=== Rebuilding index file for {} ===", ledgerIds);
+ ServerConfiguration conf = new ServerConfiguration(bkConf);
+ new InterleavedStorageRegenerateIndexOp(conf, ledgerIds,
password).initiate(dryRun);
+ LOG.info("-- Done rebuilding index file for {} --", ledgerIds);
+ return 0;
+ }
+ }
+
final Map<String, MyCommand> commands = new HashMap<String, MyCommand>();
{
@@ -2849,6 +2915,7 @@ public class BookieShell implements Tool {
commands.put(CMD_CONVERT_TO_DB_STORAGE, new ConvertToDbStorageCmd());
commands.put(CMD_CONVERT_TO_INTERLEAVED_STORAGE, new
ConvertToInterleavedStorageCmd());
commands.put(CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX, new
RebuildDbLedgerLocationsIndexCmd());
+ commands.put(CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE, new
RegenerateInterleavedStorageIndexFile());
commands.put(CMD_HELP, new HelpCmd());
commands.put(CMD_LOSTBOOKIERECOVERYDELAY, new
LostBookieRecoveryDelayCmd());
commands.put(CMD_TRIGGERAUDIT, new TriggerAuditCmd());
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
new file mode 100644
index 0000000..de48eaf
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
@@ -0,0 +1,230 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.common.util.Watcher;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.bookkeeper.util.SnapshotMap;
+import org.apache.commons.lang.time.DurationFormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scan all entries in the entry log and rebuild the index file for one ledger.
+ */
+public class InterleavedStorageRegenerateIndexOp {
+ private static final Logger LOG =
LoggerFactory.getLogger(InterleavedStorageRegenerateIndexOp.class);
+
+ private final ServerConfiguration conf;
+ private final Set<Long> ledgerIds;
+ private final byte[] masterKey;
+
+ public InterleavedStorageRegenerateIndexOp(ServerConfiguration conf,
Set<Long> ledgerIds, byte[] password)
+ throws NoSuchAlgorithmException {
+ this.conf = conf;
+ this.ledgerIds = ledgerIds;
+ this.masterKey = DigestManager.generateMasterKey(password);
+ }
+
+ static class RecoveryStats {
+ long firstEntry = Long.MAX_VALUE;
+ long lastEntry = Long.MIN_VALUE;
+ long numEntries = 0;
+
+ void registerEntry(long entryId) {
+ numEntries++;
+ if (entryId < firstEntry) {
+ firstEntry = entryId;
+ }
+ if (entryId > lastEntry) {
+ lastEntry = entryId;
+ }
+ }
+
+ long getNumEntries() {
+ return numEntries;
+ }
+
+ long getFirstEntry() {
+ return firstEntry;
+ }
+
+ long getLastEntry() {
+ return lastEntry;
+ }
+ }
+
+ public void initiate(boolean dryRun) throws IOException {
+ LOG.info("Starting index rebuilding");
+
+ DiskChecker diskChecker = Bookie.createDiskChecker(conf);
+ LedgerDirsManager ledgerDirsManager = Bookie.createLedgerDirsManager(
+ conf, diskChecker, NullStatsLogger.INSTANCE);
+ LedgerDirsManager indexDirsManager = Bookie.createIndexDirsManager(
+ conf, diskChecker, NullStatsLogger.INSTANCE,
ledgerDirsManager);
+ EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+ final LedgerCache ledgerCache;
+ if (dryRun) {
+ ledgerCache = new DryRunLedgerCache();
+ } else {
+ ledgerCache = new LedgerCacheImpl(conf, new SnapshotMap<Long,
Boolean>(),
+ indexDirsManager,
NullStatsLogger.INSTANCE);
+ }
+
+ Set<Long> entryLogs = entryLogger.getEntryLogsSet();
+
+ int totalEntryLogs = entryLogs.size();
+ int completedEntryLogs = 0;
+ long startTime = System.nanoTime();
+
+ LOG.info("Scanning {} entry logs", totalEntryLogs);
+
+ Map<Long, RecoveryStats> stats = new HashMap<>();
+ for (long entryLogId : entryLogs) {
+ LOG.info("Scanning {}", entryLogId);
+ entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
+ @Override
+ public void process(long ledgerId, long offset, ByteBuf entry)
throws IOException {
+ long entryId = entry.getLong(8);
+
+ stats.computeIfAbsent(ledgerId, (ignore) -> new
RecoveryStats()).registerEntry(entryId);
+
+ // Actual location indexed is pointing past the entry size
+ long location = (entryLogId << 32L) | (offset + 4);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Rebuilding {}:{} at location {} / {}",
ledgerId, entryId, location >> 32,
+ location & (Integer.MAX_VALUE - 1));
+ }
+
+ if (!ledgerCache.ledgerExists(ledgerId)) {
+ ledgerCache.setMasterKey(ledgerId, masterKey);
+ ledgerCache.setFenced(ledgerId);
+ }
+ ledgerCache.putEntryOffset(ledgerId, entryId, location);
+ }
+
+ @Override
+ public boolean accept(long ledgerId) {
+ return ledgerIds.contains(ledgerId);
+ }
+ });
+
+ ledgerCache.flushLedger(true);
+
+ ++completedEntryLogs;
+ LOG.info("Completed scanning of log {}.log -- {} / {}",
Long.toHexString(entryLogId), completedEntryLogs,
+ totalEntryLogs);
+ }
+
+ LOG.info("Rebuilding indices done");
+ for (long ledgerId : ledgerIds) {
+ RecoveryStats ledgerStats = stats.get(ledgerId);
+ if (ledgerStats == null || ledgerStats.getNumEntries() == 0) {
+ LOG.info(" {} - No entries found", ledgerId);
+ } else {
+ LOG.info(" {} - Found {} entries, from {} to {}", ledgerId,
+ ledgerStats.getNumEntries(),
ledgerStats.getFirstEntry(), ledgerStats.getLastEntry());
+ }
+ }
+ LOG.info("Total time: {}", DurationFormatUtils.formatDurationHMS(
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startTime)));
+ }
+
+
+ static class DryRunLedgerCache implements LedgerCache {
+ @Override
+ public void close() {
+ }
+ @Override
+ public boolean setFenced(long ledgerId) throws IOException {
+ return false;
+ }
+ @Override
+ public boolean isFenced(long ledgerId) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public void setMasterKey(long ledgerId, byte[] masterKey) throws
IOException {
+ }
+ @Override
+ public byte[] readMasterKey(long ledgerId) throws IOException,
BookieException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public boolean ledgerExists(long ledgerId) throws IOException {
+ return false;
+ }
+ @Override
+ public void putEntryOffset(long ledger, long entry, long offset)
throws IOException {
+ }
+ @Override
+ public long getEntryOffset(long ledger, long entry) throws IOException
{
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public void flushLedger(boolean doAll) throws IOException {
+ }
+ @Override
+ public long getLastEntry(long ledgerId) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Long getLastAddConfirmed(long ledgerId) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public long updateLastAddConfirmed(long ledgerId, long lac) throws
IOException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+
Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public void deleteLedger(long ledgerId) throws IOException {
+ }
+ @Override
+ public void setExplicitLac(long ledgerId, ByteBuf lac) throws
IOException {
+ }
+ @Override
+ public ByteBuf getExplicitLac(long ledgerId) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 1b859a3..beddaed 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -34,7 +34,6 @@ import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.security.GeneralSecurityException;
-import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
@@ -79,7 +78,6 @@ import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
import org.apache.bookkeeper.proto.checksum.DigestManager;
-import org.apache.bookkeeper.proto.checksum.MacDigestManager;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -150,16 +148,6 @@ public class LedgerHandle implements WriteHandle {
final Counter lacUpdateMissesCounter;
private final OpStatsLogger clientChannelWriteWaitStats;
- // This empty master key is used when an empty password is provided which
is the hash of an empty string
- private static final byte[] emptyLedgerKey;
- static {
- try {
- emptyLedgerKey = MacDigestManager.genDigest("ledger", new byte[0]);
- } catch (NoSuchAlgorithmException e) {
- throw new RuntimeException(e);
- }
- }
-
public Map<Integer, BookieSocketAddress> getDelayedWriteFailedBookies() {
return delayedWriteFailedBookies;
}
@@ -197,7 +185,7 @@ public class LedgerHandle implements WriteHandle {
// If the password is empty, pass the same random ledger key which is
generated by the hash of the empty
// password, so that the bookie can avoid processing the keys for each
entry
- this.ledgerKey = password.length > 0 ?
MacDigestManager.genDigest("ledger", password) : emptyLedgerKey;
+ this.ledgerKey = DigestManager.generateMasterKey(password);
distributionSchedule = new RoundRobinDistributionSchedule(
metadata.getWriteQuorumSize(), metadata.getAckQuorumSize(),
metadata.getEnsembleSize());
this.bookieFailureHistory = CacheBuilder.newBuilder()
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index 4c174a8..1928637 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -23,6 +23,7 @@ import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.security.GeneralSecurityException;
+import java.security.NoSuchAlgorithmException;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -86,6 +87,10 @@ public abstract class DigestManager {
}
}
+ public static byte[] generateMasterKey(byte[] password) throws
NoSuchAlgorithmException {
+ return password.length > 0 ? MacDigestManager.genDigest("ledger",
password) : MacDigestManager.EMPTY_LEDGER_KEY;
+ }
+
/**
* Computes the digest for an entry and put bytes together for sending.
*
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
index 8d830a4..e71c077 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
@@ -46,6 +46,15 @@ public class MacDigestManager extends DigestManager {
final byte[] passwd;
+ static final byte[] EMPTY_LEDGER_KEY;
+ static {
+ try {
+ EMPTY_LEDGER_KEY = MacDigestManager.genDigest("ledger", new
byte[0]);
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private final ThreadLocal<Mac> mac = new ThreadLocal<Mac>() {
@Override
protected Mac initialValue() {
diff --git a/conf/log4j.shell.properties b/conf/log4j.shell.properties
index caec948..7f8c00f 100644
--- a/conf/log4j.shell.properties
+++ b/conf/log4j.shell.properties
@@ -48,3 +48,4 @@ log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.apache.bookkeeper=ERROR
log4j.logger.org.apache.bookkeeper.bookie.BookieShell=INFO
log4j.logger.org.apache.bookkeeper.client.BookKeeperAdmin=INFO
+log4j.logger.org.apache.bookkeeper.bookie.InterleavedStorageRegenerateIndexOp=INFO
\ No newline at end of file
diff --git
a/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java
b/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java
index 8d02596..2d4fc27 100644
---
a/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java
+++
b/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java
@@ -24,6 +24,7 @@ import com.github.dockerjava.api.DockerClient;
import java.io.IOException;
import java.net.Socket;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -136,13 +137,13 @@ public class BookKeeperClusterUtils {
public static void updateAllBookieConf(DockerClient docker, String
version, String key, String value)
throws Exception {
- for (String b : DockerUtils.cubeIdsMatching("bookkeeper")) {
+ for (String b : allBookies()) {
updateBookieConf(docker, b, version, key, value);
}
}
public static boolean runOnAnyBookie(DockerClient docker, String... cmds)
throws Exception {
- Optional<String> bookie =
DockerUtils.cubeIdsMatching("bookkeeper").stream().findAny();
+ Optional<String> bookie = allBookies().stream().findAny();
if (bookie.isPresent()) {
DockerUtils.runCommand(docker, bookie.get(), cmds);
return true;
@@ -152,7 +153,7 @@ public class BookKeeperClusterUtils {
}
public static String getAnyBookie() throws Exception {
- Optional<String> bookie =
DockerUtils.cubeIdsMatching("bookkeeper").stream().findAny();
+ Optional<String> bookie = allBookies().stream().findAny();
if (bookie.isPresent()) {
return bookie.get();
} else {
@@ -161,11 +162,15 @@ public class BookKeeperClusterUtils {
}
public static void runOnAllBookies(DockerClient docker, String... cmds)
throws Exception {
- for (String b : DockerUtils.cubeIdsMatching("bookkeeper")) {
+ for (String b : allBookies()) {
DockerUtils.runCommand(docker, b, cmds);
}
}
+ public static Set<String> allBookies() {
+ return DockerUtils.cubeIdsMatching("bookkeeper");
+ }
+
private static boolean waitBookieState(DockerClient docker, String
containerId,
int timeout, TimeUnit timeoutUnit,
boolean upOrDown) {
@@ -219,7 +224,7 @@ public class BookKeeperClusterUtils {
public static boolean startAllBookiesWithVersion(DockerClient docker,
String version)
throws Exception {
- return DockerUtils.cubeIdsMatching("bookkeeper").stream()
+ return allBookies().stream()
.map((b) -> startBookieWithVersion(docker, b, version))
.reduce(true, BookKeeperClusterUtils::allTrue);
}
@@ -235,13 +240,13 @@ public class BookKeeperClusterUtils {
}
public static boolean stopAllBookies(DockerClient docker) {
- return DockerUtils.cubeIdsMatching("bookkeeper").stream()
+ return allBookies().stream()
.map((b) -> stopBookie(docker, b))
.reduce(true, BookKeeperClusterUtils::allTrue);
}
public static boolean waitAllBookieUp(DockerClient docker) {
- return DockerUtils.cubeIdsMatching("bookkeeper").stream()
+ return allBookies().stream()
.map((b) -> waitBookieUp(docker, b, 10, TimeUnit.SECONDS))
.reduce(true, BookKeeperClusterUtils::allTrue);
}
diff --git
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/BookieShellTestBase.java
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/BookieShellTestBase.java
index 276b842..05ff030 100644
---
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/BookieShellTestBase.java
+++
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/BookieShellTestBase.java
@@ -30,8 +30,8 @@ import org.junit.Test;
@Slf4j
public abstract class BookieShellTestBase {
- private String currentVersion = System.getProperty("currentVersion");
- private String bkScript;
+ String currentVersion = System.getProperty("currentVersion");
+ String bkScript;
@Before
public void setup() {
diff --git
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestBookieShellCluster.java
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestBookieShellCluster.java
index 8b138c7..47fbc55 100644
---
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestBookieShellCluster.java
+++
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestBookieShellCluster.java
@@ -18,14 +18,23 @@
package org.apache.bookkeeper.tests.integration;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertTrue;
import com.github.dockerjava.api.DockerClient;
import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.BookKeeper;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.tests.integration.utils.BookKeeperClusterUtils;
import org.apache.bookkeeper.tests.integration.utils.DockerUtils;
import org.jboss.arquillian.junit.Arquillian;
import org.jboss.arquillian.test.api.ArquillianResource;
+import org.junit.Assert;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -44,6 +53,8 @@ public class TestBookieShellCluster extends
BookieShellTestBase {
private String currentVersion = System.getProperty("currentVersion");
+ private static final byte[] PASSWORD = "foobar".getBytes(UTF_8);
+
@Test
@Override
public void test000_Setup() throws Exception {
@@ -83,4 +94,92 @@ public class TestBookieShellCluster extends
BookieShellTestBase {
public void test003_ListRWBookies() throws Exception {
super.test003_ListRWBookies();
}
+
+ private static long writeNEntries(BookKeeper bk, int n, int numBookies)
throws Exception {
+ try (WriteHandle writer =
bk.newCreateLedgerOp().withEnsembleSize(numBookies)
+ .withWriteQuorumSize(numBookies).withAckQuorumSize(numBookies)
+ .withPassword(PASSWORD).execute().get()) {
+ int i = 0;
+ for (; i < n - 1; i++) {
+ writer.appendAsync(("entry" + i).getBytes(UTF_8));
+ }
+ writer.append(("entry" + i).getBytes(UTF_8));
+
+ return writer.getId();
+ }
+ }
+
+ private static void validateNEntries(BookKeeper bk, long ledgerId, int n)
throws Exception {
+ try (ReadHandle reader = bk.newOpenLedgerOp()
+ .withLedgerId(ledgerId)
+ .withPassword(PASSWORD)
+ .execute().get();
+ LedgerEntries entries = reader.read(0, n - 1)) {
+ Assert.assertEquals(reader.getLastAddConfirmed(), n - 1);
+
+ for (int i = 0; i < n; i++) {
+ Assert.assertEquals("entry" + i, new
String(entries.getEntry(i).getEntryBytes(), UTF_8));
+ }
+ }
+ }
+
+ /**
+ * These tests on being able to access cluster internals, so can't be put
in test base.
+ */
+ @Test
+ public void test101_RegenerateIndex() throws Exception {
+ String zookeeper = String.format("zk+hierarchical://%s/ledgers",
+
BookKeeperClusterUtils.zookeeperConnectString(docker));
+ int numEntries = 100;
+
+ try (BookKeeper bk = BookKeeper.newBuilder(
+ new
ClientConfiguration().setMetadataServiceUri(zookeeper)).build()) {
+ log.info("Writing entries");
+ long ledgerId1 = writeNEntries(bk, numEntries,
BookKeeperClusterUtils.allBookies().size());
+ long ledgerId2 = writeNEntries(bk, numEntries,
BookKeeperClusterUtils.allBookies().size());
+
+ log.info("Validate that we can read back");
+ validateNEntries(bk, ledgerId1, numEntries);
+ validateNEntries(bk, ledgerId2, numEntries);
+
+ String indexFileName1 =
String.format("/opt/bookkeeper/data/ledgers/current/0/%d/%d.idx",
+ ledgerId1, ledgerId1);
+ String indexFileName2 =
String.format("/opt/bookkeeper/data/ledgers/current/0/%d/%d.idx",
+ ledgerId2, ledgerId2);
+
+ log.info("Stop bookies to flush, delete the index and start
again");
+ assertTrue(BookKeeperClusterUtils.stopAllBookies(docker));
+
+ BookKeeperClusterUtils.runOnAllBookies(docker, "rm",
indexFileName1, indexFileName2);
+
assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker,
currentVersion));
+
+ log.info("Validate that we cannot read back");
+ try {
+ validateNEntries(bk, ledgerId1, numEntries);
+ Assert.fail("Shouldn't have been able to find anything");
+ } catch (BKException.BKNoSuchLedgerExistsException e) {
+ // expected
+ }
+ try {
+ validateNEntries(bk, ledgerId2, numEntries);
+ Assert.fail("Shouldn't have been able to find anything");
+ } catch (BKException.BKNoSuchLedgerExistsException e) {
+ // expected
+ }
+
+ assertTrue(BookKeeperClusterUtils.stopAllBookies(docker));
+
+ log.info("Regenerate the index file");
+ BookKeeperClusterUtils.runOnAllBookies(docker,
+ bkScript, "shell",
"regenerate-interleaved-storage-index-file",
+ "--ledgerIds", String.format("%d,%d", ledgerId1,
ledgerId2),
+ "--password", new String(PASSWORD, UTF_8));
+
assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker,
currentVersion));
+
+ log.info("Validate that we can read back, after regeneration");
+ validateNEntries(bk, ledgerId1, numEntries);
+ validateNEntries(bk, ledgerId2, numEntries);
+ }
+ }
+
}