sijie closed pull request #1642: Utility to rebuild interleaved storage index
files
URL: https://github.com/apache/bookkeeper/pull/1642
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index e9e4d15023..3795b37b91 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -629,17 +629,10 @@ public Bookie(ServerConfiguration conf, StatsLogger
statsLogger)
for (File journalDirectory : conf.getJournalDirs()) {
this.journalDirectories.add(getCurrentDirectory(journalDirectory));
}
- DiskChecker diskChecker = new
DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold());
- this.ledgerDirsManager = new LedgerDirsManager(conf,
conf.getLedgerDirs(), diskChecker,
- statsLogger.scope(LD_LEDGER_SCOPE));
-
- File[] idxDirs = conf.getIndexDirs();
- if (null == idxDirs) {
- this.indexDirsManager = this.ledgerDirsManager;
- } else {
- this.indexDirsManager = new LedgerDirsManager(conf, idxDirs,
diskChecker,
- statsLogger.scope(LD_INDEX_SCOPE));
- }
+ DiskChecker diskChecker = createDiskChecker(conf);
+ this.ledgerDirsManager = createLedgerDirsManager(conf, diskChecker,
statsLogger.scope(LD_LEDGER_SCOPE));
+ this.indexDirsManager = createIndexDirsManager(conf, diskChecker,
statsLogger.scope(LD_INDEX_SCOPE),
+ this.ledgerDirsManager);
// instantiate zookeeper client to initialize ledger manager
this.metadataDriver = instantiateMetadataDriver(conf);
@@ -675,7 +668,7 @@ public Bookie(ServerConfiguration conf, StatsLogger
statsLogger)
}
}
- if (null == idxDirs) {
+ if (ledgerDirsManager == indexDirsManager) {
this.idxMonitor = this.ledgerMonitor;
} else {
this.idxMonitor = new LedgerDirsMonitor(conf, diskChecker,
indexDirsManager);
@@ -1546,4 +1539,22 @@ public int getExitCode() {
return exitCode;
}
+ static DiskChecker createDiskChecker(ServerConfiguration conf) {
+ return new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold());
+ }
+
+ static LedgerDirsManager createLedgerDirsManager(ServerConfiguration conf,
DiskChecker diskChecker,
+ StatsLogger statsLogger) {
+ return new LedgerDirsManager(conf, conf.getLedgerDirs(), diskChecker,
statsLogger);
+ }
+
+ static LedgerDirsManager createIndexDirsManager(ServerConfiguration conf,
DiskChecker diskChecker,
+ StatsLogger statsLogger,
LedgerDirsManager fallback) {
+ File[] idxDirs = conf.getIndexDirs();
+ if (null == idxDirs) {
+ return fallback;
+ } else {
+ return new LedgerDirsManager(conf, idxDirs, diskChecker,
statsLogger);
+ }
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 4434cfc3ae..8eebce58be 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -18,7 +18,7 @@
package org.apache.bookkeeper.bookie;
-import static com.google.common.base.Charsets.UTF_8;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
import static
org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithMetadataBookieDriver;
import static
org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithRegistrationManager;
@@ -51,6 +51,7 @@
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -72,6 +73,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
@@ -187,6 +189,7 @@
static final String CMD_CONVERT_TO_DB_STORAGE = "convert-to-db-storage";
static final String CMD_CONVERT_TO_INTERLEAVED_STORAGE =
"convert-to-interleaved-storage";
static final String CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX =
"rebuild-db-ledger-locations-index";
+ static final String CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE =
"regenerate-interleaved-storage-index-file";
static final String CMD_HELP = "help";
final ServerConfiguration bkConf = new ServerConfiguration();
@@ -2816,6 +2819,69 @@ int runCmd(CommandLine cmdLine) throws Exception {
}
}
+ /**
+ * Regenerate an index file for interleaved storage.
+ */
+ class RegenerateInterleavedStorageIndexFile extends MyCommand {
+ Options opts = new Options();
+
+ public RegenerateInterleavedStorageIndexFile() {
+ super(CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE);
+ Option ledgerOption = new Option("l", "ledgerIds", true,
+ "Ledger(s) whose index needs to
be regenerated."
+ + " Multiple can be specified,
comma separated.");
+ ledgerOption.setRequired(true);
+ ledgerOption.setValueSeparator(',');
+ ledgerOption.setArgs(Option.UNLIMITED_VALUES);
+
+ opts.addOption(ledgerOption);
+ opts.addOption("dryRun", false,
+ "Process the entryLogger, but don't write
anything.");
+ opts.addOption("password", true,
+ "The bookie stores the password in the index file,
so we need it to regenerate. "
+ + "This must match the value in the ledger
metadata.");
+ opts.addOption("b64password", true,
+ "The password in base64 encoding, for cases where
the password is not UTF-8.");
+ }
+
+ @Override
+ Options getOptions() {
+ return opts;
+ }
+
+ @Override
+ String getDescription() {
+ return "Regenerate an interleaved storage index file, from
available entrylogger files.";
+ }
+
+ @Override
+ String getUsage() {
+ return CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE;
+ }
+
+ @Override
+ int runCmd(CommandLine cmdLine) throws Exception {
+ byte[] password;
+ if (cmdLine.hasOption("password")) {
+ password = cmdLine.getOptionValue("password").getBytes(UTF_8);
+ } else if (cmdLine.hasOption("b64password")) {
+ password =
Base64.getDecoder().decode(cmdLine.getOptionValue("b64password"));
+ } else {
+ LOG.error("The password must be specified to regenerate the
index file.");
+ return 1;
+ }
+ Set<Long> ledgerIds =
Arrays.stream(cmdLine.getOptionValues("ledgerIds"))
+ .map((id) -> Long.parseLong(id)).collect(Collectors.toSet());
+ boolean dryRun = cmdLine.hasOption("dryRun");
+
+ LOG.info("=== Rebuilding index file for {} ===", ledgerIds);
+ ServerConfiguration conf = new ServerConfiguration(bkConf);
+ new InterleavedStorageRegenerateIndexOp(conf, ledgerIds,
password).initiate(dryRun);
+ LOG.info("-- Done rebuilding index file for {} --", ledgerIds);
+ return 0;
+ }
+ }
+
final Map<String, MyCommand> commands = new HashMap<String, MyCommand>();
{
@@ -2849,6 +2915,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
commands.put(CMD_CONVERT_TO_DB_STORAGE, new ConvertToDbStorageCmd());
commands.put(CMD_CONVERT_TO_INTERLEAVED_STORAGE, new
ConvertToInterleavedStorageCmd());
commands.put(CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX, new
RebuildDbLedgerLocationsIndexCmd());
+ commands.put(CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE, new
RegenerateInterleavedStorageIndexFile());
commands.put(CMD_HELP, new HelpCmd());
commands.put(CMD_LOSTBOOKIERECOVERYDELAY, new
LostBookieRecoveryDelayCmd());
commands.put(CMD_TRIGGERAUDIT, new TriggerAuditCmd());
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
new file mode 100644
index 0000000000..de48eafe5d
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
@@ -0,0 +1,230 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.common.util.Watcher;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.bookkeeper.util.SnapshotMap;
+import org.apache.commons.lang.time.DurationFormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scan all entries in the entry log and rebuild the index file for one ledger.
+ */
+public class InterleavedStorageRegenerateIndexOp {
+ private static final Logger LOG =
LoggerFactory.getLogger(InterleavedStorageRegenerateIndexOp.class);
+
+ private final ServerConfiguration conf;
+ private final Set<Long> ledgerIds;
+ private final byte[] masterKey;
+
+ public InterleavedStorageRegenerateIndexOp(ServerConfiguration conf,
Set<Long> ledgerIds, byte[] password)
+ throws NoSuchAlgorithmException {
+ this.conf = conf;
+ this.ledgerIds = ledgerIds;
+ this.masterKey = DigestManager.generateMasterKey(password);
+ }
+
+ static class RecoveryStats {
+ long firstEntry = Long.MAX_VALUE;
+ long lastEntry = Long.MIN_VALUE;
+ long numEntries = 0;
+
+ void registerEntry(long entryId) {
+ numEntries++;
+ if (entryId < firstEntry) {
+ firstEntry = entryId;
+ }
+ if (entryId > lastEntry) {
+ lastEntry = entryId;
+ }
+ }
+
+ long getNumEntries() {
+ return numEntries;
+ }
+
+ long getFirstEntry() {
+ return firstEntry;
+ }
+
+ long getLastEntry() {
+ return lastEntry;
+ }
+ }
+
+ public void initiate(boolean dryRun) throws IOException {
+ LOG.info("Starting index rebuilding");
+
+ DiskChecker diskChecker = Bookie.createDiskChecker(conf);
+ LedgerDirsManager ledgerDirsManager = Bookie.createLedgerDirsManager(
+ conf, diskChecker, NullStatsLogger.INSTANCE);
+ LedgerDirsManager indexDirsManager = Bookie.createIndexDirsManager(
+ conf, diskChecker, NullStatsLogger.INSTANCE,
ledgerDirsManager);
+ EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+ final LedgerCache ledgerCache;
+ if (dryRun) {
+ ledgerCache = new DryRunLedgerCache();
+ } else {
+ ledgerCache = new LedgerCacheImpl(conf, new SnapshotMap<Long,
Boolean>(),
+ indexDirsManager,
NullStatsLogger.INSTANCE);
+ }
+
+ Set<Long> entryLogs = entryLogger.getEntryLogsSet();
+
+ int totalEntryLogs = entryLogs.size();
+ int completedEntryLogs = 0;
+ long startTime = System.nanoTime();
+
+ LOG.info("Scanning {} entry logs", totalEntryLogs);
+
+ Map<Long, RecoveryStats> stats = new HashMap<>();
+ for (long entryLogId : entryLogs) {
+ LOG.info("Scanning {}", entryLogId);
+ entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
+ @Override
+ public void process(long ledgerId, long offset, ByteBuf entry)
throws IOException {
+ long entryId = entry.getLong(8);
+
+ stats.computeIfAbsent(ledgerId, (ignore) -> new
RecoveryStats()).registerEntry(entryId);
+
+ // Actual location indexed is pointing past the entry size
+ long location = (entryLogId << 32L) | (offset + 4);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Rebuilding {}:{} at location {} / {}",
ledgerId, entryId, location >> 32,
+ location & (Integer.MAX_VALUE - 1));
+ }
+
+ if (!ledgerCache.ledgerExists(ledgerId)) {
+ ledgerCache.setMasterKey(ledgerId, masterKey);
+ ledgerCache.setFenced(ledgerId);
+ }
+ ledgerCache.putEntryOffset(ledgerId, entryId, location);
+ }
+
+ @Override
+ public boolean accept(long ledgerId) {
+ return ledgerIds.contains(ledgerId);
+ }
+ });
+
+ ledgerCache.flushLedger(true);
+
+ ++completedEntryLogs;
+ LOG.info("Completed scanning of log {}.log -- {} / {}",
Long.toHexString(entryLogId), completedEntryLogs,
+ totalEntryLogs);
+ }
+
+ LOG.info("Rebuilding indices done");
+ for (long ledgerId : ledgerIds) {
+ RecoveryStats ledgerStats = stats.get(ledgerId);
+ if (ledgerStats == null || ledgerStats.getNumEntries() == 0) {
+ LOG.info(" {} - No entries found", ledgerId);
+ } else {
+ LOG.info(" {} - Found {} entries, from {} to {}", ledgerId,
+ ledgerStats.getNumEntries(),
ledgerStats.getFirstEntry(), ledgerStats.getLastEntry());
+ }
+ }
+ LOG.info("Total time: {}", DurationFormatUtils.formatDurationHMS(
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startTime)));
+ }
+
+
+ static class DryRunLedgerCache implements LedgerCache {
+ @Override
+ public void close() {
+ }
+ @Override
+ public boolean setFenced(long ledgerId) throws IOException {
+ return false;
+ }
+ @Override
+ public boolean isFenced(long ledgerId) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public void setMasterKey(long ledgerId, byte[] masterKey) throws
IOException {
+ }
+ @Override
+ public byte[] readMasterKey(long ledgerId) throws IOException,
BookieException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public boolean ledgerExists(long ledgerId) throws IOException {
+ return false;
+ }
+ @Override
+ public void putEntryOffset(long ledger, long entry, long offset)
throws IOException {
+ }
+ @Override
+ public long getEntryOffset(long ledger, long entry) throws IOException
{
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public void flushLedger(boolean doAll) throws IOException {
+ }
+ @Override
+ public long getLastEntry(long ledgerId) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Long getLastAddConfirmed(long ledgerId) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public long updateLastAddConfirmed(long ledgerId, long lac) throws
IOException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+
Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public void deleteLedger(long ledgerId) throws IOException {
+ }
+ @Override
+ public void setExplicitLac(long ledgerId, ByteBuf lac) throws
IOException {
+ }
+ @Override
+ public ByteBuf getExplicitLac(long ledgerId) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 741610dcef..a291a48905 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -34,7 +34,6 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.security.GeneralSecurityException;
-import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
@@ -81,7 +80,6 @@
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
import org.apache.bookkeeper.proto.checksum.DigestManager;
-import org.apache.bookkeeper.proto.checksum.MacDigestManager;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -152,16 +150,6 @@
final Counter lacUpdateMissesCounter;
private final OpStatsLogger clientChannelWriteWaitStats;
- // This empty master key is used when an empty password is provided which
is the hash of an empty string
- private static final byte[] emptyLedgerKey;
- static {
- try {
- emptyLedgerKey = MacDigestManager.genDigest("ledger", new byte[0]);
- } catch (NoSuchAlgorithmException e) {
- throw new RuntimeException(e);
- }
- }
-
public Map<Integer, BookieSocketAddress> getDelayedWriteFailedBookies() {
return delayedWriteFailedBookies;
}
@@ -199,7 +187,7 @@
// If the password is empty, pass the same random ledger key which is
generated by the hash of the empty
// password, so that the bookie can avoid processing the keys for each
entry
- this.ledgerKey = password.length > 0 ?
MacDigestManager.genDigest("ledger", password) : emptyLedgerKey;
+ this.ledgerKey = DigestManager.generateMasterKey(password);
distributionSchedule = new RoundRobinDistributionSchedule(
metadata.getWriteQuorumSize(), metadata.getAckQuorumSize(),
metadata.getEnsembleSize());
this.bookieFailureHistory = CacheBuilder.newBuilder()
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index 4c174a8df1..1928637454 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -23,6 +23,7 @@
import io.netty.util.ReferenceCountUtil;
import java.security.GeneralSecurityException;
+import java.security.NoSuchAlgorithmException;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -86,6 +87,10 @@ public static DigestManager instantiate(long ledgerId,
byte[] passwd, DigestType
}
}
+ public static byte[] generateMasterKey(byte[] password) throws
NoSuchAlgorithmException {
+ return password.length > 0 ? MacDigestManager.genDigest("ledger",
password) : MacDigestManager.EMPTY_LEDGER_KEY;
+ }
+
/**
* Computes the digest for an entry and put bytes together for sending.
*
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
index 8d830a488d..e71c077eab 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
@@ -46,6 +46,15 @@
final byte[] passwd;
+ static final byte[] EMPTY_LEDGER_KEY;
+ static {
+ try {
+ EMPTY_LEDGER_KEY = MacDigestManager.genDigest("ledger", new
byte[0]);
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private final ThreadLocal<Mac> mac = new ThreadLocal<Mac>() {
@Override
protected Mac initialValue() {
diff --git a/conf/log4j.shell.properties b/conf/log4j.shell.properties
index caec94859e..7f8c00f335 100644
--- a/conf/log4j.shell.properties
+++ b/conf/log4j.shell.properties
@@ -48,3 +48,4 @@ log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.apache.bookkeeper=ERROR
log4j.logger.org.apache.bookkeeper.bookie.BookieShell=INFO
log4j.logger.org.apache.bookkeeper.client.BookKeeperAdmin=INFO
+log4j.logger.org.apache.bookkeeper.bookie.InterleavedStorageRegenerateIndexOp=INFO
\ No newline at end of file
diff --git
a/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java
b/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java
index 8d025964b0..2d4fc27cc8 100644
---
a/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java
+++
b/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java
@@ -24,6 +24,7 @@
import java.io.IOException;
import java.net.Socket;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -136,13 +137,13 @@ public static void updateBookieConf(DockerClient docker,
String containerId,
public static void updateAllBookieConf(DockerClient docker, String
version, String key, String value)
throws Exception {
- for (String b : DockerUtils.cubeIdsMatching("bookkeeper")) {
+ for (String b : allBookies()) {
updateBookieConf(docker, b, version, key, value);
}
}
public static boolean runOnAnyBookie(DockerClient docker, String... cmds)
throws Exception {
- Optional<String> bookie =
DockerUtils.cubeIdsMatching("bookkeeper").stream().findAny();
+ Optional<String> bookie = allBookies().stream().findAny();
if (bookie.isPresent()) {
DockerUtils.runCommand(docker, bookie.get(), cmds);
return true;
@@ -152,7 +153,7 @@ public static boolean runOnAnyBookie(DockerClient docker,
String... cmds) throws
}
public static String getAnyBookie() throws Exception {
- Optional<String> bookie =
DockerUtils.cubeIdsMatching("bookkeeper").stream().findAny();
+ Optional<String> bookie = allBookies().stream().findAny();
if (bookie.isPresent()) {
return bookie.get();
} else {
@@ -161,11 +162,15 @@ public static String getAnyBookie() throws Exception {
}
public static void runOnAllBookies(DockerClient docker, String... cmds)
throws Exception {
- for (String b : DockerUtils.cubeIdsMatching("bookkeeper")) {
+ for (String b : allBookies()) {
DockerUtils.runCommand(docker, b, cmds);
}
}
+ public static Set<String> allBookies() {
+ return DockerUtils.cubeIdsMatching("bookkeeper");
+ }
+
private static boolean waitBookieState(DockerClient docker, String
containerId,
int timeout, TimeUnit timeoutUnit,
boolean upOrDown) {
@@ -219,7 +224,7 @@ private static boolean allTrue(boolean accumulator, boolean
result) {
public static boolean startAllBookiesWithVersion(DockerClient docker,
String version)
throws Exception {
- return DockerUtils.cubeIdsMatching("bookkeeper").stream()
+ return allBookies().stream()
.map((b) -> startBookieWithVersion(docker, b, version))
.reduce(true, BookKeeperClusterUtils::allTrue);
}
@@ -235,13 +240,13 @@ public static boolean stopBookie(DockerClient docker,
String containerId) {
}
public static boolean stopAllBookies(DockerClient docker) {
- return DockerUtils.cubeIdsMatching("bookkeeper").stream()
+ return allBookies().stream()
.map((b) -> stopBookie(docker, b))
.reduce(true, BookKeeperClusterUtils::allTrue);
}
public static boolean waitAllBookieUp(DockerClient docker) {
- return DockerUtils.cubeIdsMatching("bookkeeper").stream()
+ return allBookies().stream()
.map((b) -> waitBookieUp(docker, b, 10, TimeUnit.SECONDS))
.reduce(true, BookKeeperClusterUtils::allTrue);
}
diff --git
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/BookieShellTestBase.java
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/BookieShellTestBase.java
index 276b842f6e..05ff030470 100644
---
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/BookieShellTestBase.java
+++
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/BookieShellTestBase.java
@@ -30,8 +30,8 @@
@Slf4j
public abstract class BookieShellTestBase {
- private String currentVersion = System.getProperty("currentVersion");
- private String bkScript;
+ String currentVersion = System.getProperty("currentVersion");
+ String bkScript;
@Before
public void setup() {
diff --git
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestBookieShellCluster.java
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestBookieShellCluster.java
index 8b138c7b46..47fbc55471 100644
---
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestBookieShellCluster.java
+++
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestBookieShellCluster.java
@@ -18,14 +18,23 @@
package org.apache.bookkeeper.tests.integration;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertTrue;
import com.github.dockerjava.api.DockerClient;
import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.BookKeeper;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.tests.integration.utils.BookKeeperClusterUtils;
import org.apache.bookkeeper.tests.integration.utils.DockerUtils;
import org.jboss.arquillian.junit.Arquillian;
import org.jboss.arquillian.test.api.ArquillianResource;
+import org.junit.Assert;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -44,6 +53,8 @@
private String currentVersion = System.getProperty("currentVersion");
+ private static final byte[] PASSWORD = "foobar".getBytes(UTF_8);
+
@Test
@Override
public void test000_Setup() throws Exception {
@@ -83,4 +94,92 @@ public void test002_ListROBookies() throws Exception {
public void test003_ListRWBookies() throws Exception {
super.test003_ListRWBookies();
}
+
+ private static long writeNEntries(BookKeeper bk, int n, int numBookies)
throws Exception {
+ try (WriteHandle writer =
bk.newCreateLedgerOp().withEnsembleSize(numBookies)
+ .withWriteQuorumSize(numBookies).withAckQuorumSize(numBookies)
+ .withPassword(PASSWORD).execute().get()) {
+ int i = 0;
+ for (; i < n - 1; i++) {
+ writer.appendAsync(("entry" + i).getBytes(UTF_8));
+ }
+ writer.append(("entry" + i).getBytes(UTF_8));
+
+ return writer.getId();
+ }
+ }
+
+ private static void validateNEntries(BookKeeper bk, long ledgerId, int n)
throws Exception {
+ try (ReadHandle reader = bk.newOpenLedgerOp()
+ .withLedgerId(ledgerId)
+ .withPassword(PASSWORD)
+ .execute().get();
+ LedgerEntries entries = reader.read(0, n - 1)) {
+ Assert.assertEquals(reader.getLastAddConfirmed(), n - 1);
+
+ for (int i = 0; i < n; i++) {
+ Assert.assertEquals("entry" + i, new
String(entries.getEntry(i).getEntryBytes(), UTF_8));
+ }
+ }
+ }
+
+ /**
+ * These tests on being able to access cluster internals, so can't be put
in test base.
+ */
+ @Test
+ public void test101_RegenerateIndex() throws Exception {
+ String zookeeper = String.format("zk+hierarchical://%s/ledgers",
+
BookKeeperClusterUtils.zookeeperConnectString(docker));
+ int numEntries = 100;
+
+ try (BookKeeper bk = BookKeeper.newBuilder(
+ new
ClientConfiguration().setMetadataServiceUri(zookeeper)).build()) {
+ log.info("Writing entries");
+ long ledgerId1 = writeNEntries(bk, numEntries,
BookKeeperClusterUtils.allBookies().size());
+ long ledgerId2 = writeNEntries(bk, numEntries,
BookKeeperClusterUtils.allBookies().size());
+
+ log.info("Validate that we can read back");
+ validateNEntries(bk, ledgerId1, numEntries);
+ validateNEntries(bk, ledgerId2, numEntries);
+
+ String indexFileName1 =
String.format("/opt/bookkeeper/data/ledgers/current/0/%d/%d.idx",
+ ledgerId1, ledgerId1);
+ String indexFileName2 =
String.format("/opt/bookkeeper/data/ledgers/current/0/%d/%d.idx",
+ ledgerId2, ledgerId2);
+
+ log.info("Stop bookies to flush, delete the index and start
again");
+ assertTrue(BookKeeperClusterUtils.stopAllBookies(docker));
+
+ BookKeeperClusterUtils.runOnAllBookies(docker, "rm",
indexFileName1, indexFileName2);
+
assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker,
currentVersion));
+
+ log.info("Validate that we cannot read back");
+ try {
+ validateNEntries(bk, ledgerId1, numEntries);
+ Assert.fail("Shouldn't have been able to find anything");
+ } catch (BKException.BKNoSuchLedgerExistsException e) {
+ // expected
+ }
+ try {
+ validateNEntries(bk, ledgerId2, numEntries);
+ Assert.fail("Shouldn't have been able to find anything");
+ } catch (BKException.BKNoSuchLedgerExistsException e) {
+ // expected
+ }
+
+ assertTrue(BookKeeperClusterUtils.stopAllBookies(docker));
+
+ log.info("Regenerate the index file");
+ BookKeeperClusterUtils.runOnAllBookies(docker,
+ bkScript, "shell",
"regenerate-interleaved-storage-index-file",
+ "--ledgerIds", String.format("%d,%d", ledgerId1,
ledgerId2),
+ "--password", new String(PASSWORD, UTF_8));
+
assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker,
currentVersion));
+
+ log.info("Validate that we can read back, after regeneration");
+ validateNEntries(bk, ledgerId1, numEntries);
+ validateNEntries(bk, ledgerId2, numEntries);
+ }
+ }
+
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services