This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new f0a198e2b3 stage 9: bugfix indexDirs for LedgersIndexRebuildOp (#3403)
f0a198e2b3 is described below

commit f0a198e2b35d2c3b9451dd4e50373b274b42f0f8
Author: StevenLuMT <[email protected]>
AuthorDate: Fri Jul 15 15:17:38 2022 +0800

    stage 9: bugfix indexDirs for LedgersIndexRebuildOp (#3403)
    
    Co-authored-by: lushiji <[email protected]>
---
 .../bookie/storage/ldb/LedgersIndexRebuildOp.java  | 120 ++++++++++---------
 .../storage/ldb/LedgersIndexRebuildOpTest.java     | 130 +++++++++++++++++++++
 .../tools/cli/helpers/CommandTestBase.java         |   2 +-
 3 files changed, 199 insertions(+), 53 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
index 1f5cf8de4c..2895c37f83 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
@@ -75,68 +75,84 @@ public class LedgersIndexRebuildOp {
     @SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE")
     public boolean initiate()  {
         LOG.info("Starting ledger index rebuilding");
+        File[] indexDirs = conf.getIndexDirs();
+        if (indexDirs == null) {
+            indexDirs = conf.getLedgerDirs();
+        }
+        if (indexDirs.length != conf.getLedgerDirs().length) {
+            LOG.error("ledger and index dirs size not matched");
+            return false;
+        }
 
-        String timestamp = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date());
-        String basePath = 
BookieImpl.getCurrentDirectory(conf.getLedgerDirs()[0]).toString();
-        String tempLedgersSubPath = LedgersSubPath + ".TEMP-" + timestamp;
-        Path tempPath = FileSystems.getDefault().getPath(basePath, 
tempLedgersSubPath);
-        Path currentPath = FileSystems.getDefault().getPath(basePath, 
LedgersSubPath);
-
-        LOG.info("Starting scan phase (scans journal and entry log files)");
+        for (int i = 0; i < indexDirs.length; i++) {
+            File indexDir = indexDirs[i];
+            File ledgerDir = conf.getLedgerDirs()[i];
+
+            String timestamp = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date());
+            String indexBasePath = 
BookieImpl.getCurrentDirectory(indexDir).toString();
+            String tempLedgersSubPath = LedgersSubPath + ".TEMP-" + timestamp;
+            Path indexTempPath = 
FileSystems.getDefault().getPath(indexBasePath, tempLedgersSubPath);
+            Path indexCurrentPath = 
FileSystems.getDefault().getPath(indexBasePath, LedgersSubPath);
+
+            LOG.info("Starting scan phase (scans journal and entry log 
files)");
+
+            try {
+                Set<Long> ledgers = new HashSet<>();
+                scanJournals(ledgers);
+                File[] lDirs = new File[1];
+                lDirs[0] = ledgerDir;
+                scanEntryLogFiles(ledgers, lDirs);
+
+                LOG.info("Scan complete, found {} ledgers. "
+                        + "Starting to build a new ledgers index", 
ledgers.size());
+
+                try (KeyValueStorage newIndex = 
KeyValueStorageRocksDB.factory.newKeyValueStorage(
+                        indexBasePath, tempLedgersSubPath, 
DbConfigType.Default, conf)) {
+                    LOG.info("Created ledgers index at temp location {}", 
indexTempPath);
+
+                    for (Long ledgerId : ledgers) {
+                        DbLedgerStorageDataFormats.LedgerData ledgerData =
+                                
DbLedgerStorageDataFormats.LedgerData.newBuilder()
+                                        .setExists(true)
+                                        .setFenced(true)
+                                        
.setMasterKey(ByteString.EMPTY).build();
+
+                        byte[] ledgerArray = new byte[16];
+                        ArrayUtil.setLong(ledgerArray, 0, ledgerId);
+                        newIndex.put(ledgerArray, ledgerData.toByteArray());
+                    }
 
-        try {
-            Set<Long> ledgers = new HashSet<>();
-            scanJournals(ledgers);
-            scanEntryLogFiles(ledgers);
-
-            LOG.info("Scan complete, found {} ledgers. "
-                    + "Starting to build a new ledgers index", ledgers.size());
-
-            try (KeyValueStorage newIndex = 
KeyValueStorageRocksDB.factory.newKeyValueStorage(
-                    basePath, tempLedgersSubPath, DbConfigType.Default, conf)) 
{
-                LOG.info("Created ledgers index at temp location {}", 
tempPath);
-
-                for (Long ledgerId : ledgers) {
-                    DbLedgerStorageDataFormats.LedgerData ledgerData =
-                            DbLedgerStorageDataFormats.LedgerData.newBuilder()
-                                    .setExists(true)
-                                    .setFenced(true)
-                                    .setMasterKey(ByteString.EMPTY).build();
-
-                    byte[] ledgerArray = new byte[16];
-                    ArrayUtil.setLong(ledgerArray, 0, ledgerId);
-                    newIndex.put(ledgerArray, ledgerData.toByteArray());
+                    newIndex.sync();
                 }
-
-                newIndex.sync();
+            } catch (Throwable t) {
+                LOG.error("Error during rebuild, the original index remains 
unchanged", t);
+                delete(indexTempPath);
+                return false;
             }
-        } catch (Throwable t) {
-            LOG.error("Error during rebuild, the original index remains 
unchanged", t);
-            delete(tempPath);
-            return false;
-        }
 
-        // replace the existing index
-        try {
-            Path prevPath = FileSystems.getDefault().getPath(basePath, 
LedgersSubPath + ".PREV-" + timestamp);
-            LOG.info("Moving original index from original location: {} up to 
back-up location: {}",
-                    currentPath, prevPath);
-            Files.move(currentPath, prevPath);
-            LOG.info("Moving rebuilt index from: {} to: {}", tempPath, 
currentPath);
-            Files.move(tempPath, currentPath);
-            LOG.info("Original index has been replaced with the new index. "
-                    + "The original index has been moved to {}", prevPath);
-        } catch (IOException e) {
-            LOG.error("Could not replace original index with rebuilt index. "
-                    + "To return to the original state, ensure the original 
index is in its original location", e);
-            return false;
+            // replace the existing index
+            try {
+                Path prevPath = FileSystems.getDefault().getPath(indexBasePath,
+                        LedgersSubPath + ".PREV-" + timestamp);
+                LOG.info("Moving original index from original location: {} up 
to back-up location: {}",
+                        indexCurrentPath, prevPath);
+                Files.move(indexCurrentPath, prevPath);
+                LOG.info("Moving rebuilt index from: {} to: {}", 
indexTempPath, indexCurrentPath);
+                Files.move(indexTempPath, indexCurrentPath);
+                LOG.info("Original index has been replaced with the new index. 
"
+                        + "The original index has been moved to {}", prevPath);
+            } catch (IOException e) {
+                LOG.error("Could not replace original index with rebuilt 
index. "
+                        + "To return to the original state, ensure the 
original index is in its original location", e);
+                return false;
+            }
         }
 
         return true;
     }
 
-    private void scanEntryLogFiles(Set<Long> ledgers) throws IOException {
-        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, new 
LedgerDirsManager(conf, conf.getLedgerDirs(),
+    private void scanEntryLogFiles(Set<Long> ledgers, File[] lDirs) throws 
IOException {
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, new 
LedgerDirsManager(conf, lDirs,
                 new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold())));
         Set<Long> entryLogs = entryLogger.getEntryLogsSet();
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOpTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOpTest.java
new file mode 100644
index 0000000000..9fad4cc3b1
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOpTest.java
@@ -0,0 +1,130 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.TmpDirs;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for class {@link LocationsIndexRebuildOp}.
+ */
+public class LedgersIndexRebuildOpTest {
+
+    CheckpointSource checkpointSource = new CheckpointSource() {
+        @Override
+        public Checkpoint newCheckpoint() {
+            return Checkpoint.MAX;
+        }
+
+        @Override
+        public void checkpointComplete(Checkpoint checkpoint, boolean compact) 
throws IOException {
+        }
+    };
+
+    Checkpointer checkpointer = new Checkpointer() {
+        @Override
+        public void startCheckpoint(Checkpoint checkpoint) {
+            // No-op
+        }
+
+        @Override
+        public void start() {
+            // no-op
+        }
+    };
+
+    protected final TmpDirs tmpDirs = new TmpDirs();
+    private String newDirectory() throws Exception {
+        File d = tmpDirs.createNew("bkTest", ".dir");
+        d.delete();
+        d.mkdir();
+        File curDir = BookieImpl.getCurrentDirectory(d);
+        BookieImpl.checkDirectoryStructure(curDir);
+        return d.getPath();
+    }
+
+    @Test
+    public void testMultiLedgerIndexDiffDirs() throws Exception {
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setLedgerDirNames(new String[] { newDirectory(), newDirectory() 
});
+        conf.setIndexDirName(new String[] { newDirectory(), newDirectory() });
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        DiskChecker diskChecker = new 
DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold());
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(), diskChecker);
+        LedgerDirsManager indexDirsManager = new LedgerDirsManager(conf, 
conf.getIndexDirs(), diskChecker);
+
+        DbLedgerStorage ledgerStorage = new DbLedgerStorage();
+        ledgerStorage.initialize(conf, null, ledgerDirsManager, 
indexDirsManager,
+                NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
+        ledgerStorage.setCheckpointer(checkpointer);
+        ledgerStorage.setCheckpointSource(checkpointSource);
+
+        // Insert some ledger & entries in the storage
+        for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+            ledgerStorage.setMasterKey(ledgerId, ("ledger-" + 
ledgerId).getBytes());
+            ledgerStorage.setFenced(ledgerId);
+
+            for (long entryId = 0; entryId < 100; entryId++) {
+                ByteBuf entry = Unpooled.buffer(128);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                ledgerStorage.addEntry(entry);
+            }
+        }
+
+        ledgerStorage.flush();
+        ledgerStorage.shutdown();
+
+        // Rebuild index through the tool
+        Assert.assertTrue(new LedgersIndexRebuildOp(conf, true).initiate());
+
+        // clean test data
+        List<String> toDeleted = Lists.newArrayList(conf.getLedgerDirNames());
+        toDeleted.addAll(Lists.newArrayList(conf.getIndexDirNames()));
+        toDeleted.forEach(d -> {
+            try {
+                FileUtils.forceDelete(new File(d));
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        });
+    }
+}
diff --git 
a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/helpers/CommandTestBase.java
 
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/helpers/CommandTestBase.java
index 39423496fb..2ce56caf3d 100644
--- 
a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/helpers/CommandTestBase.java
+++ 
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/helpers/CommandTestBase.java
@@ -107,7 +107,7 @@ public class CommandTestBase extends MockCommandSupport {
                 doCallRealMethod().when(serverConfiguration).getLedgerDirs();
             }
             if (indexDirNames != null) {
-                
doReturn(indexDirNames).when(serverConfiguration).getLedgerDirNames();
+                
doReturn(indexDirNames).when(serverConfiguration).getIndexDirNames();
                 doCallRealMethod().when(serverConfiguration).getIndexDirs();
             }
             
doReturn(defaultConf.getDiskUsageThreshold()).when(serverConfiguration).getDiskUsageThreshold();

Reply via email to