This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new ec2636c Issue #1985: Migrate command `convert-to-db-storage`
ec2636c is described below
commit ec2636cd2e3285eefc1eabc2634e5b5576fe81b8
Author: Yong Zhang <[email protected]>
AuthorDate: Tue Mar 19 23:38:40 2019 +0800
Issue #1985: Migrate command `convert-to-db-storage`
*Motivation*
- Use bkctl to run command `convert-to-db-storage`
*Modifications*
- #1985
- Add command in `bookieGroup`
Reviewers: Sijie Guo <[email protected]>
This closes #1986 from zymap/command-ctdb, closes #1985
---
.../org/apache/bookkeeper/bookie/BookieShell.java | 41 +------
.../bookie/InterleavedLedgerStorage.java | 2 +
.../commands/bookie/ConvertToDBStorageCommand.java | 128 +++++++++++++++++++++
.../tools/cli/commands/BookieCommandGroup.java | 2 +
.../bookie/ConvertToDBStorageCommandTest.java | 124 ++++++++++++++++++++
5 files changed, 261 insertions(+), 36 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 3d78c33..ded7d14 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -107,6 +107,7 @@ import
org.apache.bookkeeper.replication.ReplicationException;
import
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import
org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToDBStorageCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.FormatCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.InitCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand;
@@ -2548,42 +2549,10 @@ public class BookieShell implements Tool {
@Override
int runCmd(CommandLine cmdLine) throws Exception {
- LOG.info("=== Converting to DbLedgerStorage ===");
- ServerConfiguration conf = new ServerConfiguration(bkConf);
-
- InterleavedLedgerStorage interleavedStorage = new
InterleavedLedgerStorage();
- Bookie.mountLedgerStorageOffline(conf, interleavedStorage);
-
- DbLedgerStorage dbStorage = new DbLedgerStorage();
- Bookie.mountLedgerStorageOffline(conf, dbStorage);
-
- int convertedLedgers = 0;
- for (long ledgerId : interleavedStorage.getActiveLedgersInRange(0,
Long.MAX_VALUE)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Converting ledger {}",
ledgerIdFormatter.formatLedgerId(ledgerId));
- }
-
- LedgerCache.LedgerIndexMetadata fi =
interleavedStorage.readLedgerIndexMetadata(ledgerId);
-
- LedgerCache.PageEntriesIterable pages =
interleavedStorage.getIndexEntries(ledgerId);
-
- long numberOfEntries = dbStorage.addLedgerToIndex(ledgerId,
fi.fenced, fi.masterKey, pages);
- if (LOG.isDebugEnabled()) {
- LOG.debug(" -- done. fenced={} entries={}", fi.fenced,
numberOfEntries);
- }
-
- // Remove index from old storage
- interleavedStorage.deleteLedger(ledgerId);
-
- if (++convertedLedgers % 1000 == 0) {
- LOG.info("Converted {} ledgers", convertedLedgers);
- }
- }
-
- dbStorage.shutdown();
- interleavedStorage.shutdown();
-
- LOG.info("---- Done Converting ----");
+ ConvertToDBStorageCommand cmd = new ConvertToDBStorageCommand();
+ ConvertToDBStorageCommand.CTDBFlags flags = new
ConvertToDBStorageCommand.CTDBFlags();
+ cmd.setLedgerIdFormatter(ledgerIdFormatter);
+ cmd.apply(bkConf, flags);
return 0;
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 3b5bf01..59ea9ec 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
+import lombok.Getter;
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
@@ -84,6 +85,7 @@ public class InterleavedLedgerStorage implements
CompactableLedgerStorage, Entry
private static final Logger LOG =
LoggerFactory.getLogger(InterleavedLedgerStorage.class);
EntryLogger entryLogger;
+ @Getter
LedgerCache ledgerCache;
protected CheckpointSource checkpointSource;
protected Checkpointer checkpointer;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommand.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommand.java
new file mode 100644
index 0000000..dc48b6c
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommand.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookie;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerCache;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.util.LedgerIdFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A command to convert bookie indexes from InterleavedStorage to
DbLedgerStorage format.
+ */
+public class ConvertToDBStorageCommand extends
BookieCommand<ConvertToDBStorageCommand.CTDBFlags> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ConvertToDBStorageCommand.class);
+ private static final String NAME = "converttodbstorage";
+ private static final String DESC = "Convert bookie indexes from
InterleavedStorage to DbLedgerStorage format";
+ private static final String NOT_INIT = "default formatter";
+
+ @Setter
+ private LedgerIdFormatter ledgerIdFormatter;
+
+ public ConvertToDBStorageCommand() {
+ this(new CTDBFlags());
+ }
+ public ConvertToDBStorageCommand(CTDBFlags flags) {
+
super(CliSpec.<CTDBFlags>newBuilder().withFlags(flags).withName(NAME).withDescription(DESC).build());
+ }
+
+ /**
+ * Flags for this command.
+ */
+ @Accessors(fluent = true)
+ @Setter
+ public static class CTDBFlags extends CliFlags {
+ @Parameter(names = { "-l", "--ledgeridformatter" }, description = "Set
ledger id formatter")
+ private String ledgerIdFormatter = NOT_INIT;
+ }
+
+ @Override
+ public boolean apply(ServerConfiguration conf, CTDBFlags cmdFlags) {
+ initLedgerIdFormatter(conf, cmdFlags);
+ try {
+ return handle(conf);
+ } catch (Exception e) {
+ throw new UncheckedExecutionException(e.getMessage(), e);
+ }
+ }
+
+ private boolean handle(ServerConfiguration conf) throws Exception {
+ LOG.info("=== Converting to DbLedgerStorage ===");
+ ServerConfiguration bkConf = new ServerConfiguration(conf);
+
+ InterleavedLedgerStorage interleavedStorage = new
InterleavedLedgerStorage();
+ Bookie.mountLedgerStorageOffline(bkConf, interleavedStorage);
+
+ DbLedgerStorage dbStorage = new DbLedgerStorage();
+ Bookie.mountLedgerStorageOffline(bkConf, dbStorage);
+
+ int convertedLedgers = 0;
+ for (long ledgerId : interleavedStorage.getActiveLedgersInRange(0,
Long.MAX_VALUE)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Converting ledger {}",
ledgerIdFormatter.formatLedgerId(ledgerId));
+ }
+
+ LedgerCache.LedgerIndexMetadata fi =
interleavedStorage.readLedgerIndexMetadata(ledgerId);
+
+ LedgerCache.PageEntriesIterable pages =
interleavedStorage.getIndexEntries(ledgerId);
+
+ long numberOfEntries = dbStorage.addLedgerToIndex(ledgerId,
fi.fenced, fi.masterKey, pages);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" -- done. fenced={} entries={}", fi.fenced,
numberOfEntries);
+ }
+
+ // Remove index from old storage
+ interleavedStorage.deleteLedger(ledgerId);
+
+ if (++convertedLedgers % 1000 == 0) {
+ LOG.info("Converted {} ledgers", convertedLedgers);
+ }
+ }
+
+ dbStorage.shutdown();
+ interleavedStorage.shutdown();
+
+ LOG.info("---- Done Converting ----");
+ return true;
+ }
+
+ private void initLedgerIdFormatter(ServerConfiguration conf, CTDBFlags
flags) {
+ if (this.ledgerIdFormatter != null) {
+ return;
+ }
+ if (flags.ledgerIdFormatter.equals(NOT_INIT)) {
+ this.ledgerIdFormatter =
LedgerIdFormatter.newLedgerIdFormatter(conf);
+ } else {
+ this.ledgerIdFormatter =
LedgerIdFormatter.newLedgerIdFormatter(flags.ledgerIdFormatter, conf);
+ }
+ }
+
+}
diff --git
a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
index 940d482..86129a4 100644
---
a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
+++
b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.tools.cli.commands;
import static
org.apache.bookkeeper.tools.common.BKCommandCategories.CATEGORY_INFRA_SERVICE;
import org.apache.bookkeeper.tools.cli.BKCtl;
+import
org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToDBStorageCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.FormatCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.InitCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand;
@@ -48,6 +49,7 @@ public class BookieCommandGroup extends
CliCommandGroup<BKFlags> {
.addCommand(new FormatCommand())
.addCommand(new SanityTestCommand())
.addCommand(new LedgerCommand())
+ .addCommand(new ConvertToDBStorageCommand())
.build();
public BookieCommandGroup() {
diff --git
a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommandTest.java
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommandTest.java
new file mode 100644
index 0000000..cc37087
--- /dev/null
+++
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommandTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookie;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.verifyNew;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.util.Iterator;
+import java.util.Vector;
+import java.util.stream.LongStream;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerCache;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Unit test for {@link ConvertToDBStorageCommand}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ Bookie.class, ConvertToDBStorageCommand.class })
+public class ConvertToDBStorageCommandTest extends BookieCommandTestBase {
+
+ private InterleavedLedgerStorage interleavedLedgerStorage;
+ private DbLedgerStorage dbStorage;
+ private LedgerCache.LedgerIndexMetadata metadata;
+ private LedgerCache.PageEntriesIterable entries;
+
+ public ConvertToDBStorageCommandTest() {
+ super(3, 0);
+ }
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+
+ whenNew(ServerConfiguration.class).withNoArguments().thenReturn(conf);
+
whenNew(ServerConfiguration.class).withParameterTypes(AbstractConfiguration.class).withArguments(conf)
+ .thenReturn(conf);
+
+ interleavedLedgerStorage = mock(InterleavedLedgerStorage.class);
+
whenNew(InterleavedLedgerStorage.class).withNoArguments().thenReturn(interleavedLedgerStorage);
+ doNothing().when(interleavedLedgerStorage).shutdown();
+ when(interleavedLedgerStorage.getActiveLedgersInRange(anyLong(),
anyLong())).thenReturn(this::getLedgerId);
+ metadata = mock(LedgerCache.LedgerIndexMetadata.class);
+
when(interleavedLedgerStorage.readLedgerIndexMetadata(anyLong())).thenReturn(metadata);
+ entries = mock(LedgerCache.PageEntriesIterable.class);
+
when(interleavedLedgerStorage.getIndexEntries(anyLong())).thenReturn(entries);
+
+ dbStorage = mock(DbLedgerStorage.class);
+ whenNew(DbLedgerStorage.class).withNoArguments().thenReturn(dbStorage);
+ doNothing().when(dbStorage).shutdown();
+ when(dbStorage.addLedgerToIndex(anyLong(), anyBoolean(), eq(new
byte[0]),
+ any(LedgerCache.PageEntriesIterable.class))).thenReturn(1L);
+
+ PowerMockito.mockStatic(Bookie.class);
+ PowerMockito.when(Bookie.mountLedgerStorageOffline(eq(conf),
eq(interleavedLedgerStorage)))
+ .thenReturn(PowerMockito.mock(InterleavedLedgerStorage.class));
+ PowerMockito.when(Bookie.mountLedgerStorageOffline(eq(conf),
eq(dbStorage))).thenReturn(dbStorage);
+ }
+
+ private Iterator<Long> getLedgerId() {
+ Vector<Long> longs = new Vector<>();
+ LongStream.range(0L, 10L).forEach(longs::add);
+ return longs.iterator();
+ }
+
+ @Test
+ public void testCTDB() {
+ ConvertToDBStorageCommand cmd = new ConvertToDBStorageCommand();
+ Assert.assertTrue(cmd.apply(bkFlags, new String[] { "" }));
+
+ try {
+ verifyNew(ServerConfiguration.class, times(1)).withArguments(conf);
+ verifyNew(InterleavedLedgerStorage.class,
times(1)).withNoArguments();
+ verifyNew(DbLedgerStorage.class, times(1)).withNoArguments();
+
+ verify(interleavedLedgerStorage,
times(10)).readLedgerIndexMetadata(anyLong());
+ verify(interleavedLedgerStorage,
times(10)).getIndexEntries(anyLong());
+ verify(dbStorage, times(10))
+ .addLedgerToIndex(anyLong(), anyBoolean(), any(),
any(LedgerCache.PageEntriesIterable.class));
+ verify(interleavedLedgerStorage,
times(10)).deleteLedger(anyLong());
+
+ verify(dbStorage, times(1)).shutdown();
+ verify(interleavedLedgerStorage, times(1)).shutdown();
+ } catch (Exception e) {
+ throw new UncheckedExecutionException(e.getMessage(), e);
+ }
+ }
+}