This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new f0a198e2b3 stage 9: bugfix indexDirs for LedgersIndexRebuildOp (#3403)
f0a198e2b3 is described below
commit f0a198e2b35d2c3b9451dd4e50373b274b42f0f8
Author: StevenLuMT <[email protected]>
AuthorDate: Fri Jul 15 15:17:38 2022 +0800
stage 9: bugfix indexDirs for LedgersIndexRebuildOp (#3403)
Co-authored-by: lushiji <[email protected]>
---
.../bookie/storage/ldb/LedgersIndexRebuildOp.java | 120 ++++++++++---------
.../storage/ldb/LedgersIndexRebuildOpTest.java | 130 +++++++++++++++++++++
.../tools/cli/helpers/CommandTestBase.java | 2 +-
3 files changed, 199 insertions(+), 53 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
index 1f5cf8de4c..2895c37f83 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
@@ -75,68 +75,84 @@ public class LedgersIndexRebuildOp {
@SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE")
public boolean initiate() {
LOG.info("Starting ledger index rebuilding");
+ File[] indexDirs = conf.getIndexDirs();
+ if (indexDirs == null) {
+ indexDirs = conf.getLedgerDirs();
+ }
+ if (indexDirs.length != conf.getLedgerDirs().length) {
+ LOG.error("ledger and index dirs size not matched");
+ return false;
+ }
- String timestamp = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date());
- String basePath =
BookieImpl.getCurrentDirectory(conf.getLedgerDirs()[0]).toString();
- String tempLedgersSubPath = LedgersSubPath + ".TEMP-" + timestamp;
- Path tempPath = FileSystems.getDefault().getPath(basePath,
tempLedgersSubPath);
- Path currentPath = FileSystems.getDefault().getPath(basePath,
LedgersSubPath);
-
- LOG.info("Starting scan phase (scans journal and entry log files)");
+ for (int i = 0; i < indexDirs.length; i++) {
+ File indexDir = indexDirs[i];
+ File ledgerDir = conf.getLedgerDirs()[i];
+
+ String timestamp = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date());
+ String indexBasePath =
BookieImpl.getCurrentDirectory(indexDir).toString();
+ String tempLedgersSubPath = LedgersSubPath + ".TEMP-" + timestamp;
+ Path indexTempPath =
FileSystems.getDefault().getPath(indexBasePath, tempLedgersSubPath);
+ Path indexCurrentPath =
FileSystems.getDefault().getPath(indexBasePath, LedgersSubPath);
+
+ LOG.info("Starting scan phase (scans journal and entry log
files)");
+
+ try {
+ Set<Long> ledgers = new HashSet<>();
+ scanJournals(ledgers);
+ File[] lDirs = new File[1];
+ lDirs[0] = ledgerDir;
+ scanEntryLogFiles(ledgers, lDirs);
+
+ LOG.info("Scan complete, found {} ledgers. "
+ + "Starting to build a new ledgers index",
ledgers.size());
+
+ try (KeyValueStorage newIndex =
KeyValueStorageRocksDB.factory.newKeyValueStorage(
+ indexBasePath, tempLedgersSubPath,
DbConfigType.Default, conf)) {
+ LOG.info("Created ledgers index at temp location {}",
indexTempPath);
+
+ for (Long ledgerId : ledgers) {
+ DbLedgerStorageDataFormats.LedgerData ledgerData =
+
DbLedgerStorageDataFormats.LedgerData.newBuilder()
+ .setExists(true)
+ .setFenced(true)
+
.setMasterKey(ByteString.EMPTY).build();
+
+ byte[] ledgerArray = new byte[16];
+ ArrayUtil.setLong(ledgerArray, 0, ledgerId);
+ newIndex.put(ledgerArray, ledgerData.toByteArray());
+ }
- try {
- Set<Long> ledgers = new HashSet<>();
- scanJournals(ledgers);
- scanEntryLogFiles(ledgers);
-
- LOG.info("Scan complete, found {} ledgers. "
- + "Starting to build a new ledgers index", ledgers.size());
-
- try (KeyValueStorage newIndex =
KeyValueStorageRocksDB.factory.newKeyValueStorage(
- basePath, tempLedgersSubPath, DbConfigType.Default, conf))
{
- LOG.info("Created ledgers index at temp location {}",
tempPath);
-
- for (Long ledgerId : ledgers) {
- DbLedgerStorageDataFormats.LedgerData ledgerData =
- DbLedgerStorageDataFormats.LedgerData.newBuilder()
- .setExists(true)
- .setFenced(true)
- .setMasterKey(ByteString.EMPTY).build();
-
- byte[] ledgerArray = new byte[16];
- ArrayUtil.setLong(ledgerArray, 0, ledgerId);
- newIndex.put(ledgerArray, ledgerData.toByteArray());
+ newIndex.sync();
}
-
- newIndex.sync();
+ } catch (Throwable t) {
+ LOG.error("Error during rebuild, the original index remains
unchanged", t);
+ delete(indexTempPath);
+ return false;
}
- } catch (Throwable t) {
- LOG.error("Error during rebuild, the original index remains
unchanged", t);
- delete(tempPath);
- return false;
- }
- // replace the existing index
- try {
- Path prevPath = FileSystems.getDefault().getPath(basePath,
LedgersSubPath + ".PREV-" + timestamp);
- LOG.info("Moving original index from original location: {} up to
back-up location: {}",
- currentPath, prevPath);
- Files.move(currentPath, prevPath);
- LOG.info("Moving rebuilt index from: {} to: {}", tempPath,
currentPath);
- Files.move(tempPath, currentPath);
- LOG.info("Original index has been replaced with the new index. "
- + "The original index has been moved to {}", prevPath);
- } catch (IOException e) {
- LOG.error("Could not replace original index with rebuilt index. "
- + "To return to the original state, ensure the original
index is in its original location", e);
- return false;
+ // replace the existing index
+ try {
+ Path prevPath = FileSystems.getDefault().getPath(indexBasePath,
+ LedgersSubPath + ".PREV-" + timestamp);
+ LOG.info("Moving original index from original location: {} up
to back-up location: {}",
+ indexCurrentPath, prevPath);
+ Files.move(indexCurrentPath, prevPath);
+ LOG.info("Moving rebuilt index from: {} to: {}",
indexTempPath, indexCurrentPath);
+ Files.move(indexTempPath, indexCurrentPath);
+ LOG.info("Original index has been replaced with the new index.
"
+ + "The original index has been moved to {}", prevPath);
+ } catch (IOException e) {
+ LOG.error("Could not replace original index with rebuilt
index. "
+ + "To return to the original state, ensure the
original index is in its original location", e);
+ return false;
+ }
}
return true;
}
- private void scanEntryLogFiles(Set<Long> ledgers) throws IOException {
- DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, new
LedgerDirsManager(conf, conf.getLedgerDirs(),
+ private void scanEntryLogFiles(Set<Long> ledgers, File[] lDirs) throws
IOException {
+ DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, new
LedgerDirsManager(conf, lDirs,
new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold())));
Set<Long> entryLogs = entryLogger.getEntryLogsSet();
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOpTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOpTest.java
new file mode 100644
index 0000000000..9fad4cc3b1
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOpTest.java
@@ -0,0 +1,130 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.TmpDirs;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for class {@link LocationsIndexRebuildOp}.
+ */
+public class LedgersIndexRebuildOpTest {
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+ @Override
+ public Checkpoint newCheckpoint() {
+ return Checkpoint.MAX;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
throws IOException {
+ }
+ };
+
+ Checkpointer checkpointer = new Checkpointer() {
+ @Override
+ public void startCheckpoint(Checkpoint checkpoint) {
+ // No-op
+ }
+
+ @Override
+ public void start() {
+ // no-op
+ }
+ };
+
+ protected final TmpDirs tmpDirs = new TmpDirs();
+ private String newDirectory() throws Exception {
+ File d = tmpDirs.createNew("bkTest", ".dir");
+ d.delete();
+ d.mkdir();
+ File curDir = BookieImpl.getCurrentDirectory(d);
+ BookieImpl.checkDirectoryStructure(curDir);
+ return d.getPath();
+ }
+
+ @Test
+ public void testMultiLedgerIndexDiffDirs() throws Exception {
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setLedgerDirNames(new String[] { newDirectory(), newDirectory()
});
+ conf.setIndexDirName(new String[] { newDirectory(), newDirectory() });
+ conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ DiskChecker diskChecker = new
DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold());
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf,
conf.getLedgerDirs(), diskChecker);
+ LedgerDirsManager indexDirsManager = new LedgerDirsManager(conf,
conf.getIndexDirs(), diskChecker);
+
+ DbLedgerStorage ledgerStorage = new DbLedgerStorage();
+ ledgerStorage.initialize(conf, null, ledgerDirsManager,
indexDirsManager,
+ NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
+ ledgerStorage.setCheckpointer(checkpointer);
+ ledgerStorage.setCheckpointSource(checkpointSource);
+
+ // Insert some ledger & entries in the storage
+ for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+ ledgerStorage.setMasterKey(ledgerId, ("ledger-" +
ledgerId).getBytes());
+ ledgerStorage.setFenced(ledgerId);
+
+ for (long entryId = 0; entryId < 100; entryId++) {
+ ByteBuf entry = Unpooled.buffer(128);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ ledgerStorage.addEntry(entry);
+ }
+ }
+
+ ledgerStorage.flush();
+ ledgerStorage.shutdown();
+
+ // Rebuild index through the tool
+ Assert.assertTrue(new LedgersIndexRebuildOp(conf, true).initiate());
+
+ // clean test data
+ List<String> toDeleted = Lists.newArrayList(conf.getLedgerDirNames());
+ toDeleted.addAll(Lists.newArrayList(conf.getIndexDirNames()));
+ toDeleted.forEach(d -> {
+ try {
+ FileUtils.forceDelete(new File(d));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+}
diff --git
a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/helpers/CommandTestBase.java
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/helpers/CommandTestBase.java
index 39423496fb..2ce56caf3d 100644
---
a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/helpers/CommandTestBase.java
+++
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/helpers/CommandTestBase.java
@@ -107,7 +107,7 @@ public class CommandTestBase extends MockCommandSupport {
doCallRealMethod().when(serverConfiguration).getLedgerDirs();
}
if (indexDirNames != null) {
-
doReturn(indexDirNames).when(serverConfiguration).getLedgerDirNames();
+
doReturn(indexDirNames).when(serverConfiguration).getIndexDirNames();
doCallRealMethod().when(serverConfiguration).getIndexDirs();
}
doReturn(defaultConf.getDiskUsageThreshold()).when(serverConfiguration).getDiskUsageThreshold();