This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new ec2636c  Issue #1985: Migrate command `convert-to-db-storage`
ec2636c is described below

commit ec2636cd2e3285eefc1eabc2634e5b5576fe81b8
Author: Yong Zhang <[email protected]>
AuthorDate: Tue Mar 19 23:38:40 2019 +0800

    Issue #1985: Migrate command `convert-to-db-storage`
    
    *Motivation*
    
    - Use bkctl to run command `convert-to-db-storage`
    
    *Modifications*
    
    - #1985
    - Add command in `bookieGroup`
    
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #1986 from zymap/command-ctdb, closes #1985
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  |  41 +------
 .../bookie/InterleavedLedgerStorage.java           |   2 +
 .../commands/bookie/ConvertToDBStorageCommand.java | 128 +++++++++++++++++++++
 .../tools/cli/commands/BookieCommandGroup.java     |   2 +
 .../bookie/ConvertToDBStorageCommandTest.java      | 124 ++++++++++++++++++++
 5 files changed, 261 insertions(+), 36 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 3d78c33..ded7d14 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -107,6 +107,7 @@ import 
org.apache.bookkeeper.replication.ReplicationException;
 import 
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import 
org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToDBStorageCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.FormatCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.InitCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand;
@@ -2548,42 +2549,10 @@ public class BookieShell implements Tool {
 
         @Override
         int runCmd(CommandLine cmdLine) throws Exception {
-            LOG.info("=== Converting to DbLedgerStorage ===");
-            ServerConfiguration conf = new ServerConfiguration(bkConf);
-
-            InterleavedLedgerStorage interleavedStorage = new 
InterleavedLedgerStorage();
-            Bookie.mountLedgerStorageOffline(conf, interleavedStorage);
-
-            DbLedgerStorage dbStorage = new DbLedgerStorage();
-            Bookie.mountLedgerStorageOffline(conf, dbStorage);
-
-            int convertedLedgers = 0;
-            for (long ledgerId : interleavedStorage.getActiveLedgersInRange(0, 
Long.MAX_VALUE)) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Converting ledger {}", 
ledgerIdFormatter.formatLedgerId(ledgerId));
-                }
-
-                LedgerCache.LedgerIndexMetadata fi = 
interleavedStorage.readLedgerIndexMetadata(ledgerId);
-
-                LedgerCache.PageEntriesIterable pages = 
interleavedStorage.getIndexEntries(ledgerId);
-
-                long numberOfEntries = dbStorage.addLedgerToIndex(ledgerId, 
fi.fenced, fi.masterKey, pages);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("   -- done. fenced={} entries={}", fi.fenced, 
numberOfEntries);
-                }
-
-                // Remove index from old storage
-                interleavedStorage.deleteLedger(ledgerId);
-
-                if (++convertedLedgers % 1000 == 0) {
-                    LOG.info("Converted {} ledgers", convertedLedgers);
-                }
-            }
-
-            dbStorage.shutdown();
-            interleavedStorage.shutdown();
-
-            LOG.info("---- Done Converting ----");
+            ConvertToDBStorageCommand cmd = new ConvertToDBStorageCommand();
+            ConvertToDBStorageCommand.CTDBFlags flags = new 
ConvertToDBStorageCommand.CTDBFlags();
+            cmd.setLedgerIdFormatter(ledgerIdFormatter);
+            cmd.apply(bkConf, flags);
             return 0;
         }
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 3b5bf01..59ea9ec 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import lombok.Cleanup;
+import lombok.Getter;
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
@@ -84,6 +85,7 @@ public class InterleavedLedgerStorage implements 
CompactableLedgerStorage, Entry
     private static final Logger LOG = 
LoggerFactory.getLogger(InterleavedLedgerStorage.class);
 
     EntryLogger entryLogger;
+    @Getter
     LedgerCache ledgerCache;
     protected CheckpointSource checkpointSource;
     protected Checkpointer checkpointer;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommand.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommand.java
new file mode 100644
index 0000000..dc48b6c
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommand.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookie;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerCache;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.util.LedgerIdFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A command to convert bookie indexes from InterleavedStorage to 
DbLedgerStorage format.
+ */
+public class ConvertToDBStorageCommand extends 
BookieCommand<ConvertToDBStorageCommand.CTDBFlags> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConvertToDBStorageCommand.class);
+    private static final String NAME = "converttodbstorage";
+    private static final String DESC = "Convert bookie indexes from 
InterleavedStorage to DbLedgerStorage format";
+    private static final String NOT_INIT = "default formatter";
+
+    @Setter
+    private LedgerIdFormatter ledgerIdFormatter;
+
+    public ConvertToDBStorageCommand() {
+        this(new CTDBFlags());
+    }
+    public ConvertToDBStorageCommand(CTDBFlags flags) {
+        
super(CliSpec.<CTDBFlags>newBuilder().withFlags(flags).withName(NAME).withDescription(DESC).build());
+    }
+
+    /**
+     * Flags for this command.
+     */
+    @Accessors(fluent = true)
+    @Setter
+    public static class CTDBFlags extends CliFlags {
+        @Parameter(names = { "-l", "--ledgeridformatter" }, description = "Set 
ledger id formatter")
+        private String ledgerIdFormatter = NOT_INIT;
+    }
+
+    @Override
+    public boolean apply(ServerConfiguration conf, CTDBFlags cmdFlags) {
+        initLedgerIdFormatter(conf, cmdFlags);
+        try {
+            return handle(conf);
+        } catch (Exception e) {
+            throw new UncheckedExecutionException(e.getMessage(), e);
+        }
+    }
+
+    private boolean handle(ServerConfiguration conf) throws Exception {
+        LOG.info("=== Converting to DbLedgerStorage ===");
+        ServerConfiguration bkConf = new ServerConfiguration(conf);
+
+        InterleavedLedgerStorage interleavedStorage = new 
InterleavedLedgerStorage();
+        Bookie.mountLedgerStorageOffline(bkConf, interleavedStorage);
+
+        DbLedgerStorage dbStorage = new DbLedgerStorage();
+        Bookie.mountLedgerStorageOffline(bkConf, dbStorage);
+
+        int convertedLedgers = 0;
+        for (long ledgerId : interleavedStorage.getActiveLedgersInRange(0, 
Long.MAX_VALUE)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Converting ledger {}", 
ledgerIdFormatter.formatLedgerId(ledgerId));
+            }
+
+            LedgerCache.LedgerIndexMetadata fi = 
interleavedStorage.readLedgerIndexMetadata(ledgerId);
+
+            LedgerCache.PageEntriesIterable pages = 
interleavedStorage.getIndexEntries(ledgerId);
+
+            long numberOfEntries = dbStorage.addLedgerToIndex(ledgerId, 
fi.fenced, fi.masterKey, pages);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("   -- done. fenced={} entries={}", fi.fenced, 
numberOfEntries);
+            }
+
+            // Remove index from old storage
+            interleavedStorage.deleteLedger(ledgerId);
+
+            if (++convertedLedgers % 1000 == 0) {
+                LOG.info("Converted {} ledgers", convertedLedgers);
+            }
+        }
+
+        dbStorage.shutdown();
+        interleavedStorage.shutdown();
+
+        LOG.info("---- Done Converting ----");
+        return true;
+    }
+
+    private void initLedgerIdFormatter(ServerConfiguration conf, CTDBFlags 
flags) {
+        if (this.ledgerIdFormatter != null) {
+            return;
+        }
+        if (flags.ledgerIdFormatter.equals(NOT_INIT)) {
+            this.ledgerIdFormatter = 
LedgerIdFormatter.newLedgerIdFormatter(conf);
+        } else {
+            this.ledgerIdFormatter = 
LedgerIdFormatter.newLedgerIdFormatter(flags.ledgerIdFormatter, conf);
+        }
+    }
+
+}
diff --git 
a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
 
b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
index 940d482..86129a4 100644
--- 
a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
+++ 
b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.tools.cli.commands;
 import static 
org.apache.bookkeeper.tools.common.BKCommandCategories.CATEGORY_INFRA_SERVICE;
 
 import org.apache.bookkeeper.tools.cli.BKCtl;
+import 
org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToDBStorageCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.FormatCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.InitCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand;
@@ -48,6 +49,7 @@ public class BookieCommandGroup extends 
CliCommandGroup<BKFlags> {
         .addCommand(new FormatCommand())
         .addCommand(new SanityTestCommand())
         .addCommand(new LedgerCommand())
+        .addCommand(new ConvertToDBStorageCommand())
         .build();
 
     public BookieCommandGroup() {
diff --git 
a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommandTest.java
 
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommandTest.java
new file mode 100644
index 0000000..cc37087
--- /dev/null
+++ 
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommandTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookie;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.verifyNew;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.util.Iterator;
+import java.util.Vector;
+import java.util.stream.LongStream;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerCache;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Unit test for {@link ConvertToDBStorageCommand}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ Bookie.class, ConvertToDBStorageCommand.class })
+public class ConvertToDBStorageCommandTest extends BookieCommandTestBase {
+
+    private InterleavedLedgerStorage interleavedLedgerStorage;
+    private DbLedgerStorage dbStorage;
+    private LedgerCache.LedgerIndexMetadata metadata;
+    private LedgerCache.PageEntriesIterable entries;
+
+    public ConvertToDBStorageCommandTest() {
+        super(3, 0);
+    }
+
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+
+        whenNew(ServerConfiguration.class).withNoArguments().thenReturn(conf);
+        
whenNew(ServerConfiguration.class).withParameterTypes(AbstractConfiguration.class).withArguments(conf)
+            .thenReturn(conf);
+
+        interleavedLedgerStorage = mock(InterleavedLedgerStorage.class);
+        
whenNew(InterleavedLedgerStorage.class).withNoArguments().thenReturn(interleavedLedgerStorage);
+        doNothing().when(interleavedLedgerStorage).shutdown();
+        when(interleavedLedgerStorage.getActiveLedgersInRange(anyLong(), 
anyLong())).thenReturn(this::getLedgerId);
+        metadata = mock(LedgerCache.LedgerIndexMetadata.class);
+        
when(interleavedLedgerStorage.readLedgerIndexMetadata(anyLong())).thenReturn(metadata);
+        entries = mock(LedgerCache.PageEntriesIterable.class);
+        
when(interleavedLedgerStorage.getIndexEntries(anyLong())).thenReturn(entries);
+
+        dbStorage = mock(DbLedgerStorage.class);
+        whenNew(DbLedgerStorage.class).withNoArguments().thenReturn(dbStorage);
+        doNothing().when(dbStorage).shutdown();
+        when(dbStorage.addLedgerToIndex(anyLong(), anyBoolean(), eq(new 
byte[0]),
+            any(LedgerCache.PageEntriesIterable.class))).thenReturn(1L);
+
+        PowerMockito.mockStatic(Bookie.class);
+        PowerMockito.when(Bookie.mountLedgerStorageOffline(eq(conf), 
eq(interleavedLedgerStorage)))
+            .thenReturn(PowerMockito.mock(InterleavedLedgerStorage.class));
+        PowerMockito.when(Bookie.mountLedgerStorageOffline(eq(conf), 
eq(dbStorage))).thenReturn(dbStorage);
+    }
+
+    private Iterator<Long> getLedgerId() {
+        Vector<Long> longs = new Vector<>();
+        LongStream.range(0L, 10L).forEach(longs::add);
+        return longs.iterator();
+    }
+
+    @Test
+    public void testCTDB() {
+        ConvertToDBStorageCommand cmd = new ConvertToDBStorageCommand();
+        Assert.assertTrue(cmd.apply(bkFlags, new String[] { "" }));
+
+        try {
+            verifyNew(ServerConfiguration.class, times(1)).withArguments(conf);
+            verifyNew(InterleavedLedgerStorage.class, 
times(1)).withNoArguments();
+            verifyNew(DbLedgerStorage.class, times(1)).withNoArguments();
+
+            verify(interleavedLedgerStorage, 
times(10)).readLedgerIndexMetadata(anyLong());
+            verify(interleavedLedgerStorage, 
times(10)).getIndexEntries(anyLong());
+            verify(dbStorage, times(10))
+                .addLedgerToIndex(anyLong(), anyBoolean(), any(), 
any(LedgerCache.PageEntriesIterable.class));
+            verify(interleavedLedgerStorage, 
times(10)).deleteLedger(anyLong());
+
+            verify(dbStorage, times(1)).shutdown();
+            verify(interleavedLedgerStorage, times(1)).shutdown();
+        } catch (Exception e) {
+            throw new UncheckedExecutionException(e.getMessage(), e);
+        }
+    }
+}

Reply via email to