This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 89ac2a2  DbLedgerStorage -- Added bookie shell tools
89ac2a2 is described below

commit 89ac2a2b7b7883f0b48978bf3dc3802ecf0fc6c1
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Dec 15 11:54:06 2017 -0800

    DbLedgerStorage -- Added bookie shell tools
    
    Added BookieShell tools for DbLedgerStorage conversion operations:
    
     * `convert-to-db-storage` --> Convert interleaved indexes to db storage 
indexes
     * `convert-to-interleaved-storage` --> Convert back from db storage to 
interleaved storage indexes
     * `rebuild-db-ledger-locations-index` --> Rebuild the DbLedgerStorage 
index by scanning the entry log files.
    
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #859 from merlimat/db-ledger-storage-itemized
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  | 308 ++++++++++++++++++++-
 .../bookie/storage/ldb/DbLedgerStorage.java        |   8 +-
 .../storage/ldb/LocationsIndexRebuildOp.java       | 139 ++++++++++
 .../bookie/storage/ldb/ConversionRollbackTest.java | 145 ++++++++++
 .../bookie/storage/ldb/ConversionTest.java         | 158 +++++++++++
 .../storage/ldb/LocationsIndexRebuildTest.java     | 148 ++++++++++
 6 files changed, 901 insertions(+), 5 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index f8946c1..ccffb25 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Charsets.UTF_8;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.AbstractFuture;
 
 import io.netty.buffer.ByteBuf;
@@ -33,6 +34,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.math.RoundingMode;
 import java.nio.ByteBuffer;
+import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -59,8 +61,12 @@ import java.util.function.Predicate;
 
 import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
 import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.bookie.storage.ldb.EntryLocationIndex;
+import org.apache.bookkeeper.bookie.storage.ldb.LocationsIndexRebuildOp;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -142,6 +148,9 @@ public class BookieShell implements Tool {
     static final String CMD_DECOMMISSIONBOOKIE = "decommissionbookie";
     static final String CMD_LOSTBOOKIERECOVERYDELAY = 
"lostbookierecoverydelay";
     static final String CMD_TRIGGERAUDIT = "triggeraudit";
+    static final String CMD_CONVERT_TO_DB_STORAGE = "convert-to-db-storage";
+    static final String CMD_CONVERT_TO_INTERLEAVED_STORAGE = 
"convert-to-interleaved-storage";
+    static final String CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX = 
"rebuild-db-ledger-locations-index";
     static final String CMD_HELP = "help";
 
     final ServerConfiguration bkConf = new ServerConfiguration();
@@ -1879,7 +1888,6 @@ public class BookieShell implements Tool {
             }
             return 0;
         }
-
     }
 
     /**
@@ -2113,6 +2121,241 @@ public class BookieShell implements Tool {
         void progress(long updated, long issued);
     }
 
+
+    /**
+     * Convert bookie indexes from InterleavedStorage to DbLedgerStorage 
format.
+     */
+    class ConvertToDbStorageCmd extends MyCommand {
+        Options opts = new Options();
+
+        public ConvertToDbStorageCmd() {
+            super(CMD_CONVERT_TO_DB_STORAGE);
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Convert bookie indexes from InterleavedStorage to 
DbLedgerStorage format";
+        }
+
+        String getUsage() {
+            return CMD_CONVERT_TO_DB_STORAGE;
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            LOG.info("=== Converting to DbLedgerStorage ===");
+            ServerConfiguration conf = new ServerConfiguration(bkConf);
+            LedgerDirsManager ledgerDirsManager = new 
LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+                    new DiskChecker(bkConf.getDiskUsageThreshold(), 
bkConf.getDiskUsageWarnThreshold()));
+            LedgerDirsManager ledgerIndexManager = new 
LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+                    new DiskChecker(bkConf.getDiskUsageThreshold(), 
bkConf.getDiskUsageWarnThreshold()));
+
+            InterleavedLedgerStorage interleavedStorage = new 
InterleavedLedgerStorage();
+            DbLedgerStorage dbStorage = new DbLedgerStorage();
+
+            CheckpointSource checkpointSource = new CheckpointSource() {
+                    @Override
+                    public Checkpoint newCheckpoint() {
+                        return Checkpoint.MAX;
+                    }
+
+                    @Override
+                    public void checkpointComplete(Checkpoint checkpoint, 
boolean compact)
+                            throws IOException {
+                    }
+                };
+            Checkpointer checkpointer = new Checkpointer() {
+                @Override
+                public void startCheckpoint(Checkpoint checkpoint) {
+                    // No-op
+                }
+            };
+
+            interleavedStorage.initialize(conf, null, ledgerDirsManager, 
ledgerIndexManager,
+                    checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+            dbStorage.initialize(conf, null, ledgerDirsManager, 
ledgerIndexManager,
+                    checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+
+            int convertedLedgers = 0;
+            for (long ledgerId : interleavedStorage.getActiveLedgersInRange(0, 
Long.MAX_VALUE)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Converting ledger {}", ledgerId);
+                }
+
+                FileInfo fi = getFileInfo(ledgerId);
+
+                Iterable<SortedMap<Long, Long>> entries = 
getLedgerIndexEntries(ledgerId);
+
+                long numberOfEntries = dbStorage.addLedgerToIndex(ledgerId, 
fi.isFenced(), fi.getMasterKey(), entries);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("   -- done. fenced={} entries={}", 
fi.isFenced(), numberOfEntries);
+                }
+
+                // Remove index from old storage
+                interleavedStorage.deleteLedger(ledgerId);
+
+                if (++convertedLedgers % 1000 == 0) {
+                    LOG.info("Converted {} ledgers", convertedLedgers);
+                }
+            }
+
+            dbStorage.shutdown();
+            interleavedStorage.shutdown();
+
+            LOG.info("---- Done Converting ----");
+            return 0;
+        }
+    }
+
+    /**
+     * Convert bookie indexes from DbLedgerStorage to InterleavedStorage 
format.
+     */
+    class ConvertToInterleavedStorageCmd extends MyCommand {
+        Options opts = new Options();
+
+        public ConvertToInterleavedStorageCmd() {
+            super(CMD_CONVERT_TO_INTERLEAVED_STORAGE);
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Convert bookie indexes from DbLedgerStorage to 
InterleavedStorage format";
+        }
+
+        String getUsage() {
+            return CMD_CONVERT_TO_INTERLEAVED_STORAGE;
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            LOG.info("=== Converting DbLedgerStorage ===");
+            ServerConfiguration conf = new ServerConfiguration(bkConf);
+            LedgerDirsManager ledgerDirsManager = new 
LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+                    new DiskChecker(bkConf.getDiskUsageThreshold(), 
bkConf.getDiskUsageWarnThreshold()));
+            LedgerDirsManager ledgerIndexManager = new 
LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+                    new DiskChecker(bkConf.getDiskUsageThreshold(), 
bkConf.getDiskUsageWarnThreshold()));
+
+            DbLedgerStorage dbStorage = new DbLedgerStorage();
+            InterleavedLedgerStorage interleavedStorage = new 
InterleavedLedgerStorage();
+
+            CheckpointSource checkpointSource = new CheckpointSource() {
+                    @Override
+                    public Checkpoint newCheckpoint() {
+                        return Checkpoint.MAX;
+                    }
+
+                    @Override
+                    public void checkpointComplete(Checkpoint checkpoint, 
boolean compact)
+                            throws IOException {
+                    }
+                };
+            Checkpointer checkpointer = new Checkpointer() {
+                @Override
+                public void startCheckpoint(Checkpoint checkpoint) {
+                    // No-op
+                }
+            };
+
+            dbStorage.initialize(conf, null, ledgerDirsManager, 
ledgerIndexManager,
+                        checkpointSource, checkpointer, 
NullStatsLogger.INSTANCE);
+            interleavedStorage.initialize(conf, null, ledgerDirsManager, 
ledgerIndexManager,
+                    checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+            LedgerCache interleavedLedgerCache = 
interleavedStorage.ledgerCache;
+
+            EntryLocationIndex dbEntryLocationIndex = 
dbStorage.getEntryLocationIndex();
+
+            int convertedLedgers = 0;
+            for (long ledgerId : dbStorage.getActiveLedgersInRange(0, 
Long.MAX_VALUE)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Converting ledger {}", ledgerId);
+                }
+
+                interleavedStorage.setMasterKey(ledgerId, 
dbStorage.readMasterKey(ledgerId));
+                if (dbStorage.isFenced(ledgerId)) {
+                    interleavedStorage.setFenced(ledgerId);
+                }
+
+                long lastEntryInLedger = 
dbEntryLocationIndex.getLastEntryInLedger(ledgerId);
+                for (long entryId = 0; entryId <= lastEntryInLedger; 
entryId++) {
+                    try {
+                        long location = 
dbEntryLocationIndex.getLocation(ledgerId, entryId);
+                        if (location != 0L) {
+                            interleavedLedgerCache.putEntryOffset(ledgerId, 
entryId, location);
+                        }
+                    } catch (Bookie.NoEntryException e) {
+                        // Ignore entry
+                    }
+                }
+
+                if (++convertedLedgers % 1000 == 0) {
+                    LOG.info("Converted {} ledgers", convertedLedgers);
+                }
+            }
+
+            dbStorage.shutdown();
+
+            interleavedLedgerCache.flushLedger(true);
+            interleavedStorage.flush();
+            interleavedStorage.shutdown();
+
+            String baseDir = 
ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+
+            // Rename databases and keep backup
+            Files.move(FileSystems.getDefault().getPath(baseDir, "ledgers"),
+                    FileSystems.getDefault().getPath(baseDir, 
"ledgers.backup"));
+
+            Files.move(FileSystems.getDefault().getPath(baseDir, "locations"),
+                    FileSystems.getDefault().getPath(baseDir, 
"locations.backup"));
+
+            LOG.info("---- Done Converting {} ledgers ----", convertedLedgers);
+            return 0;
+        }
+    }
+
+    /**
+     * Rebuild DbLedgerStorage locations index.
+     */
+    class RebuildDbLedgerLocationsIndexCmd extends MyCommand {
+        Options opts = new Options();
+
+        public RebuildDbLedgerLocationsIndexCmd() {
+            super(CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX);
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Rebuild DbLedgerStorage locations index by scanning the 
entry logs";
+        }
+
+        String getUsage() {
+            return CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX;
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            LOG.info("=== Rebuilding bookie index ===");
+            ServerConfiguration conf = new ServerConfiguration(bkConf);
+            new LocationsIndexRebuildOp(conf).initiate();
+            LOG.info("-- Done rebuilding bookie index --");
+            return 0;
+        }
+    }
+
     final Map<String, MyCommand> commands = new HashMap<String, MyCommand>();
     {
         commands.put(CMD_METAFORMAT, new MetaFormatCmd());
@@ -2138,6 +2381,9 @@ public class BookieShell implements Tool {
         commands.put(CMD_DELETELEDGER, new DeleteLedgerCmd());
         commands.put(CMD_BOOKIEINFO, new BookieInfoCmd());
         commands.put(CMD_DECOMMISSIONBOOKIE, new DecommissionBookieCmd());
+        commands.put(CMD_CONVERT_TO_DB_STORAGE, new ConvertToDbStorageCmd());
+        commands.put(CMD_CONVERT_TO_INTERLEAVED_STORAGE, new 
ConvertToInterleavedStorageCmd());
+        commands.put(CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX, new 
RebuildDbLedgerLocationsIndexCmd());
         commands.put(CMD_HELP, new HelpCmd());
         commands.put(CMD_LOSTBOOKIERECOVERYDELAY, new 
LostBookieRecoveryDelayCmd());
         commands.put(CMD_TRIGGERAUDIT, new TriggerAuditCmd());
@@ -2433,6 +2679,66 @@ public class BookieShell implements Tool {
     }
 
     /**
+     * Get an iterable over pages of entries and locations for a ledger.
+     *
+     * @param ledgerId
+     * @return
+     * @throws IOException
+     */
+    protected Iterable<SortedMap<Long, Long>> getLedgerIndexEntries(final long 
ledgerId) throws IOException {
+        final FileInfo fi = getFileInfo(ledgerId);
+        final long size = fi.size();
+
+        final LedgerEntryPage lep = new LedgerEntryPage(pageSize, 
entriesPerPage);
+        lep.usePage();
+
+        final Iterator<SortedMap<Long, Long>> iterator = new 
Iterator<SortedMap<Long, Long>>() {
+            long curSize = 0;
+            long curEntry = 0;
+
+            @Override
+            public boolean hasNext() {
+                return curSize < size;
+            }
+
+            @Override
+            public SortedMap<Long, Long> next() {
+                SortedMap<Long, Long> entries = Maps.newTreeMap();
+                lep.setLedgerAndFirstEntry(ledgerId, curEntry);
+                try {
+                    lep.readPage(fi);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+
+                // process a page
+                for (int i = 0; i < entriesPerPage; i++) {
+                    long offset = lep.getOffset(i * 8);
+                    if (offset != 0) {
+                        entries.put(curEntry, offset);
+                    }
+                    ++curEntry;
+                }
+
+                curSize += pageSize;
+                return entries;
+            }
+
+            @Override
+            public void remove() {
+                throw new RuntimeException("Cannot remove");
+            }
+
+        };
+
+        return new Iterable<SortedMap<Long, Long>>() {
+            public Iterator<SortedMap<Long, Long>> iterator() {
+                return iterator;
+            }
+        };
+    }
+
+    /**
      * Scan over an entry log file.
      *
      * @param logId
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index fa80016..b0257ec 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -30,8 +30,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.SortedMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
@@ -53,9 +51,11 @@ import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
 import org.apache.bookkeeper.bookie.EntryLocation;
 import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.GarbageCollectorThread;
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import 
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
@@ -714,8 +714,8 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
     }
 
     @Override
-    public Observable waitForLastAddConfirmedUpdate(long ledgerId, long 
previoisLAC, Observer observer)
-            throws IOException {
+    public boolean waitForLastAddConfirmedUpdate(long ledgerId, long 
previousLAC,
+            Watcher<LastAddConfirmedUpdateNotification> watcher) throws 
IOException {
         throw new UnsupportedOperationException();
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
new file mode 100644
index 0000000..6cf6232
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import 
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.lang.time.DurationFormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scan all entries in the entry log and rebuild the ledgerStorageIndex.
+ */
+public class LocationsIndexRebuildOp {
+    private final ServerConfiguration conf;
+
+    public LocationsIndexRebuildOp(ServerConfiguration conf) {
+        this.conf = conf;
+    }
+
+    public void initiate() throws IOException {
+        LOG.info("Starting index rebuilding");
+
+        // Move locations index to a backup directory
+        String basePath = 
Bookie.getCurrentDirectory(conf.getLedgerDirs()[0]).toString();
+        Path currentPath = FileSystems.getDefault().getPath(basePath, 
"locations");
+        String timestamp = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date());
+        Path backupPath = FileSystems.getDefault().getPath(basePath, 
"locations.BACKUP-" + timestamp);
+        Files.move(currentPath, backupPath);
+
+        LOG.info("Created locations index backup at {}", backupPath);
+
+        long startTime = System.nanoTime();
+
+        EntryLogger entryLogger = new EntryLogger(conf, new 
LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold())));
+        Set<Long> entryLogs = entryLogger.getEntryLogsSet();
+
+        String locationsDbPath = FileSystems.getDefault().getPath(basePath, 
"locations").toFile().toString();
+
+        Set<Long> activeLedgers = getActiveLedgers(conf, 
KeyValueStorageRocksDB.factory, basePath);
+        LOG.info("Found {} active ledgers in ledger manager", 
activeLedgers.size());
+
+        KeyValueStorage newIndex = 
KeyValueStorageRocksDB.factory.newKeyValueStorage(locationsDbPath, 
DbConfigType.Huge,
+                conf);
+
+        int totalEntryLogs = entryLogs.size();
+        int completedEntryLogs = 0;
+        LOG.info("Scanning {} entry logs", totalEntryLogs);
+
+        for (long entryLogId : entryLogs) {
+            entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
+                @Override
+                public void process(long ledgerId, long offset, ByteBuf entry) 
throws IOException {
+                    long entryId = entry.getLong(8);
+
+                    // Actual location indexed is pointing past the entry size
+                    long location = (entryLogId << 32L) | (offset + 4);
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Rebuilding {}:{} at location {} / {}", 
ledgerId, entryId, location >> 32,
+                                location & (Integer.MAX_VALUE - 1));
+                    }
+
+                    // Update the ledger index page
+                    LongPairWrapper key = LongPairWrapper.get(ledgerId, 
entryId);
+                    LongWrapper value = LongWrapper.get(location);
+                    newIndex.put(key.array, value.array);
+                }
+
+                @Override
+                public boolean accept(long ledgerId) {
+                    return activeLedgers.contains(ledgerId);
+                }
+            });
+
+            ++completedEntryLogs;
+            LOG.info("Completed scanning of log {}.log -- {} / {}", 
Long.toHexString(entryLogId), completedEntryLogs,
+                    totalEntryLogs);
+        }
+
+        newIndex.sync();
+        newIndex.close();
+
+        LOG.info("Rebuilding index is done. Total time: {}",
+                
DurationFormatUtils.formatDurationHMS(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
 - startTime)));
+    }
+
+    private Set<Long> getActiveLedgers(ServerConfiguration conf, 
KeyValueStorageFactory storageFactory, String basePath)
+            throws IOException {
+        LedgerMetadataIndex ledgers = new LedgerMetadataIndex(conf, 
storageFactory, basePath, NullStatsLogger.INSTANCE);
+        Set<Long> activeLedgers = Sets.newHashSet();
+        for (Long ledger : ledgers.getActiveLedgersInRange(0, Long.MAX_VALUE)) 
{
+            activeLedgers.add(ledger);
+        }
+
+        ledgers.close();
+        return activeLedgers;
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(LocationsIndexRebuildOp.class);
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
new file mode 100644
index 0000000..a954f28
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
@@ -0,0 +1,145 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Test for BookieShell convert-to-interleaved-storage command.
+ */
+@Slf4j
+public class ConversionRollbackTest {
+
+    CheckpointSource checkpointSource = new CheckpointSource() {
+        @Override
+        public Checkpoint newCheckpoint() {
+            return Checkpoint.MAX;
+        }
+
+        @Override
+        public void checkpointComplete(Checkpoint checkpoint, boolean compact) 
throws IOException {
+        }
+    };
+
+    Checkpointer checkpointer = new Checkpointer() {
+        @Override
+        public void startCheckpoint(Checkpoint checkpoint) {
+            // No-op
+        }
+    };
+
+    @Test
+    public void convertFromDbStorageToInterleaved() throws Exception {
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        log.info("Using temp directory: {}", tmpDir);
+
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        conf.setAllowLoopback(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+
+        DbLedgerStorage dbStorage = new DbLedgerStorage();
+        dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, 
checkpointSource, checkpointer,
+                NullStatsLogger.INSTANCE);
+
+        // Insert some ledger & entries in the dbStorage
+        for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+            dbStorage.setMasterKey(ledgerId, ("ledger-" + 
ledgerId).getBytes());
+            dbStorage.setFenced(ledgerId);
+
+            for (long entryId = 0; entryId < 10000; entryId++) {
+                ByteBuf entry = Unpooled.buffer(128);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                dbStorage.addEntry(entry);
+            }
+        }
+
+        dbStorage.flush();
+        dbStorage.shutdown();
+
+        // Run conversion tool
+        BookieShell shell = new BookieShell();
+        shell.setConf(conf);
+        int res = shell.run(new String[] { "convert-to-interleaved-storage" });
+
+        Assert.assertEquals(0, res);
+
+        // Verify that interleaved storage index has the same entries
+        InterleavedLedgerStorage interleavedStorage = new 
InterleavedLedgerStorage();
+        interleavedStorage.initialize(conf, null, ledgerDirsManager, 
ledgerDirsManager, checkpointSource, checkpointer,
+                NullStatsLogger.INSTANCE);
+
+        Set<Long> ledgers = 
Sets.newTreeSet(interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+        Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L, 
4L)), ledgers);
+
+        for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+            Assert.assertEquals(true, interleavedStorage.isFenced(ledgerId));
+            Assert.assertEquals("ledger-" + ledgerId, new 
String(interleavedStorage.readMasterKey(ledgerId)));
+
+            for (long entryId = 0; entryId < 10000; entryId++) {
+                ByteBuf entry = Unpooled.buffer(1024);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                ByteBuf result = interleavedStorage.getEntry(ledgerId, 
entryId);
+                Assert.assertEquals(entry, result);
+            }
+        }
+
+        interleavedStorage.shutdown();
+        FileUtils.forceDelete(tmpDir);
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
new file mode 100644
index 0000000..df5434c
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
@@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for BookieShell convert-to-db-storage command.
+ */
+public class ConversionTest {
+
+    CheckpointSource checkpointSource = new CheckpointSource() {
+        @Override
+        public Checkpoint newCheckpoint() {
+            return Checkpoint.MAX;
+        }
+
+        @Override
+        public void checkpointComplete(Checkpoint checkpoint, boolean compact) 
throws IOException {
+        }
+    };
+
+    Checkpointer checkpointer = new Checkpointer() {
+        @Override
+        public void startCheckpoint(Checkpoint checkpoint) {
+            // No-op
+        }
+    };
+
+    @Test
+    public void test() throws Exception {
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        System.out.println(tmpDir);
+
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        conf.setAllowLoopback(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+
+        InterleavedLedgerStorage interleavedStorage = new 
InterleavedLedgerStorage();
+        interleavedStorage.initialize(conf, null, ledgerDirsManager, 
ledgerDirsManager, checkpointSource, checkpointer,
+                NullStatsLogger.INSTANCE);
+
+        // Insert some ledger & entries in the interleaved storage
+        for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+            interleavedStorage.setMasterKey(ledgerId, ("ledger-" + 
ledgerId).getBytes());
+            interleavedStorage.setFenced(ledgerId);
+
+            for (long entryId = 0; entryId < 10000; entryId++) {
+                ByteBuf entry = Unpooled.buffer(128);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                interleavedStorage.addEntry(entry);
+            }
+        }
+
+        interleavedStorage.flush();
+        interleavedStorage.shutdown();
+
+        // Run conversion tool
+        BookieShell shell = new BookieShell();
+        shell.setConf(conf);
+        int res = shell.run(new String[] { "convert-to-db-storage" });
+
+        Assert.assertEquals(0, res);
+
+        // Verify that db index has the same entries
+        DbLedgerStorage dbStorage = new DbLedgerStorage();
+        dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, 
checkpointSource, checkpointer,
+                NullStatsLogger.INSTANCE);
+
+        interleavedStorage = new InterleavedLedgerStorage();
+        interleavedStorage.initialize(conf, null, ledgerDirsManager, 
ledgerDirsManager, checkpointSource, checkpointer,
+                NullStatsLogger.INSTANCE);
+
+        Set<Long> ledgers = 
Sets.newTreeSet(dbStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+        Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L, 
4L)), ledgers);
+
+        ledgers = 
Sets.newTreeSet(interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+        Assert.assertEquals(Sets.newTreeSet(), ledgers);
+
+        for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+            Assert.assertEquals(true, dbStorage.isFenced(ledgerId));
+            Assert.assertEquals("ledger-" + ledgerId, new 
String(dbStorage.readMasterKey(ledgerId)));
+
+            for (long entryId = 0; entryId < 10000; entryId++) {
+                ByteBuf entry = Unpooled.buffer(1024);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                ByteBuf result = dbStorage.getEntry(ledgerId, entryId);
+                Assert.assertEquals(entry, result);
+                result.release();
+
+                try {
+                    interleavedStorage.getEntry(ledgerId, entryId);
+                    Assert.fail("entry should not exist");
+                } catch (NoLedgerException e) {
+                    // Ok
+                }
+            }
+        }
+
+        interleavedStorage.shutdown();
+        dbStorage.shutdown();
+        FileUtils.forceDelete(tmpDir);
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
new file mode 100644
index 0000000..4793ec7
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for class {@link LocationsIndexRebuildOp}.
+ */
+public class LocationsIndexRebuildTest {
+
+    CheckpointSource checkpointSource = new CheckpointSource() {
+        @Override
+        public Checkpoint newCheckpoint() {
+            return Checkpoint.MAX;
+        }
+
+        @Override
+        public void checkpointComplete(Checkpoint checkpoint, boolean compact) 
throws IOException {
+        }
+    };
+
+    Checkpointer checkpointer = new Checkpointer() {
+        @Override
+        public void startCheckpoint(Checkpoint checkpoint) {
+            // No-op
+        }
+    };
+
+    @Test
+    public void test() throws Exception {
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        System.out.println(tmpDir);
+
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setAllowLoopback(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+
+        DbLedgerStorage ledgerStorage = new DbLedgerStorage();
+        ledgerStorage.initialize(conf, null, ledgerDirsManager, 
ledgerDirsManager, checkpointSource, checkpointer,
+                NullStatsLogger.INSTANCE);
+
+        // Insert some ledger & entries in the storage
+        for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+            ledgerStorage.setMasterKey(ledgerId, ("ledger-" + 
ledgerId).getBytes());
+            ledgerStorage.setFenced(ledgerId);
+
+            for (long entryId = 0; entryId < 100; entryId++) {
+                ByteBuf entry = Unpooled.buffer(128);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                ledgerStorage.addEntry(entry);
+            }
+        }
+
+        ledgerStorage.flush();
+        ledgerStorage.shutdown();
+
+        // Rebuild index through the tool
+        BookieShell shell = new BookieShell();
+        shell.setConf(conf);
+        int res = shell.run(new String[] { "rebuild-db-ledger-locations-index" 
});
+
+        Assert.assertEquals(0, res);
+
+        // Verify that db index has the same entries
+        ledgerStorage = new DbLedgerStorage();
+        ledgerStorage.initialize(conf, null, ledgerDirsManager, 
ledgerDirsManager, checkpointSource, checkpointer,
+                NullStatsLogger.INSTANCE);
+
+        Set<Long> ledgers = 
Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+        Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L, 
4L)), ledgers);
+
+        for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+            Assert.assertEquals(true, ledgerStorage.isFenced(ledgerId));
+            Assert.assertEquals("ledger-" + ledgerId, new 
String(ledgerStorage.readMasterKey(ledgerId)));
+
+            ByteBuf lastEntry = ledgerStorage.getLastEntry(ledgerId);
+            assertEquals(ledgerId, lastEntry.readLong());
+            long lastEntryId = lastEntry.readLong();
+            assertEquals(99, lastEntryId);
+
+            for (long entryId = 0; entryId < 100; entryId++) {
+                ByteBuf entry = Unpooled.buffer(1024);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                ByteBuf result = ledgerStorage.getEntry(ledgerId, entryId);
+                Assert.assertEquals(entry, result);
+            }
+        }
+
+        ledgerStorage.shutdown();
+        FileUtils.forceDelete(tmpDir);
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to