This is an automated email from the ASF dual-hosted git repository.

shoothzj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e2952b07b stage 5: bugfix indexDirs for readLedgerIndexEntries (#3398)
6e2952b07b is described below

commit 6e2952b07bbd2433dabd7c7e1424ecf3685061c5
Author: StevenLuMT <[email protected]>
AuthorDate: Wed Jul 13 10:45:47 2022 +0800

    stage 5: bugfix indexDirs for readLedgerIndexEntries (#3398)
    
    Descriptions of the changes in this PR:
    
    ### Motivation
    
    planning for index dir: [mail 
talking](https://lists.apache.org/thread/r657jf55khl59bbqltj2s95107lbkr0w)
    stage 5 :
    1)  fix indexDirs for EntryLocationIndex's tool
    
    ### Changes
    
    1.  bugfix indexDirs for EntryLocationIndex's tool
    2. add index's testcase for testReadLedgerIndexEntries
    3. bugfix findexDirs for TestBase
---
 .../bookie/storage/ldb/DbLedgerStorage.java        |  20 ++-
 .../storage/ldb/DbReadLedgerIndexEntriesTest.java  | 152 +++++++++++++++++++++
 2 files changed, 167 insertions(+), 5 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index 959ef33581..18e82f63a2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -467,17 +467,27 @@ public class DbLedgerStorage implements LedgerStorage {
         checkNotNull(serverConf, "ServerConfiguration can't be null");
         checkNotNull(processor, "LedgerLoggger info processor can't null");
 
-        LedgerDirsManager ledgerDirsManager = new 
LedgerDirsManager(serverConf, serverConf.getLedgerDirs(),
-                new DiskChecker(serverConf.getDiskUsageThreshold(), 
serverConf.getDiskUsageWarnThreshold()));
+        DiskChecker diskChecker = new 
DiskChecker(serverConf.getDiskUsageThreshold(),
+                serverConf.getDiskUsageWarnThreshold());
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(serverConf,
+                serverConf.getLedgerDirs(), diskChecker);
+        LedgerDirsManager indexDirsManager = ledgerDirsManager;
+        File[] idxDirs = serverConf.getIndexDirs();
+        if (null != idxDirs) {
+            indexDirsManager = new LedgerDirsManager(serverConf, idxDirs, 
diskChecker);
+        }
         List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs();
-
+        List<File> indexDirs = indexDirsManager.getAllLedgerDirs();
+        if (ledgerDirs.size() != indexDirs.size()) {
+            throw new IOException("ledger and index dirs size not matched");
+        }
         int dirIndex = MathUtils.signSafeMod(ledgerId, ledgerDirs.size());
-        String ledgerBasePath = ledgerDirs.get(dirIndex).toString();
+        String indexBasePath = indexDirs.get(dirIndex).toString();
 
         EntryLocationIndex entryLocationIndex = new 
EntryLocationIndex(serverConf,
                 (basePath, subPath, dbConfigType, conf1) ->
                         new KeyValueStorageRocksDB(basePath, subPath, 
DbConfigType.Default, conf1, true),
-                ledgerBasePath, NullStatsLogger.INSTANCE);
+                indexBasePath, NullStatsLogger.INSTANCE);
         try {
             long lastEntryId = 
entryLocationIndex.getLastEntryInLedger(ledgerId);
             for (long currentEntry = 0; currentEntry <= lastEntryId; 
currentEntry++) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbReadLedgerIndexEntriesTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbReadLedgerIndexEntriesTest.java
new file mode 100644
index 0000000000..bb4b61250e
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbReadLedgerIndexEntriesTest.java
@@ -0,0 +1,152 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.TmpDirs;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for class {@link DbLedgerStorage#readLedgerIndexEntries}.
+ */
+public class DbReadLedgerIndexEntriesTest {
+    private static final int TEST_LEDGER_MIN_ID = 0;
+    private static final int TEST_LEDGER_MAX_ID = 5;
+    private static final int TEST_ENTRY_MIN_ID = 0;
+    private static final int TEST_ENTRY_MAX_ID = 10;
+
+    CheckpointSource checkpointSource = new CheckpointSource() {
+        @Override
+        public Checkpoint newCheckpoint() {
+            return Checkpoint.MAX;
+        }
+
+        @Override
+        public void checkpointComplete(Checkpoint checkpoint, boolean compact) 
throws IOException {
+        }
+    };
+
+    Checkpointer checkpointer = new Checkpointer() {
+        @Override
+        public void startCheckpoint(Checkpoint checkpoint) {
+            // No-op
+        }
+
+        @Override
+        public void start() {
+            // no-op
+        }
+    };
+
+    protected final TmpDirs tmpDirs = new TmpDirs();
+
+    private String newDirectory() throws Exception {
+        File d = tmpDirs.createNew("bkTest", ".dir");
+        d.delete();
+        d.mkdir();
+        File curDir = BookieImpl.getCurrentDirectory(d);
+        BookieImpl.checkDirectoryStructure(curDir);
+        return d.getPath();
+    }
+
+    @Test
+    public void testReadLedgerIndexEntries() throws Exception {
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setLedgerDirNames(new String[]{newDirectory(), newDirectory()});
+        conf.setIndexDirName(new String[]{newDirectory(), newDirectory()});
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        DiskChecker diskChecker = new 
DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold());
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(), diskChecker);
+        LedgerDirsManager indexDirsManager = new LedgerDirsManager(conf, 
conf.getIndexDirs(), diskChecker);
+
+        DbLedgerStorage ledgerStorage = new DbLedgerStorage();
+        ledgerStorage.initialize(conf, null, ledgerDirsManager, 
indexDirsManager,
+                NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
+        ledgerStorage.setCheckpointer(checkpointer);
+        ledgerStorage.setCheckpointSource(checkpointSource);
+
+        // Insert some ledger & entries in the storage
+        for (long ledgerId = TEST_LEDGER_MIN_ID; ledgerId <= 
TEST_LEDGER_MAX_ID; ledgerId++) {
+            ledgerStorage.setMasterKey(ledgerId, ("ledger-" + 
ledgerId).getBytes());
+            ledgerStorage.setFenced(ledgerId);
+
+            for (long entryId = TEST_ENTRY_MIN_ID; entryId <= 
TEST_ENTRY_MAX_ID; entryId++) {
+                ByteBuf entry = Unpooled.buffer(128);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                ledgerStorage.addEntry(entry);
+            }
+        }
+
+        ledgerStorage.flush();
+        ledgerStorage.shutdown();
+
+        // read ledger index entries
+        long ledgerId = TEST_LEDGER_MIN_ID;
+        try {
+            for (ledgerId = TEST_LEDGER_MIN_ID; ledgerId <= 
TEST_LEDGER_MAX_ID; ledgerId++) {
+                BlockingQueue<Long> entrys = new 
ArrayBlockingQueue<>(TEST_ENTRY_MAX_ID + 1);
+                DbLedgerStorage.readLedgerIndexEntries(ledgerId, conf, (eId, 
entryLodId, pos) -> {
+                    System.out.println("entry " + eId + "\t:\t(log: " + 
entryLodId + ", pos: " + pos + ")");
+                    entrys.add(eId);
+                });
+                for (long entryId = TEST_ENTRY_MIN_ID; entryId <= 
TEST_ENTRY_MAX_ID; entryId++) {
+                    Assert.assertTrue(entrys.contains(entryId));
+                }
+            }
+        } catch (Exception e) {
+            System.err.printf("ERROR: initializing dbLedgerStorage %s", 
e.getMessage());
+            Assert.fail("fail to read this ledger(" + ledgerId + ") index 
entries");
+        }
+
+        List<String> toDeleted = Lists.newArrayList(conf.getLedgerDirNames());
+        toDeleted.addAll(Lists.newArrayList(conf.getIndexDirNames()));
+        toDeleted.forEach(d -> {
+            try {
+                FileUtils.forceDelete(new File(d));
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        });
+    }
+}

Reply via email to