Repository: bookkeeper Updated Branches: refs/heads/master 1a98088e3 -> a13d75d7e
BOOKKEEPER-894: add command to read ledger entries form shell Author: Siddharth Boobna <[email protected]> Reviewers: Sijie Guo <[email protected]> Closes #24 from sboobna/BOOKKEEPER-894 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/a13d75d7 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/a13d75d7 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/a13d75d7 Branch: refs/heads/master Commit: a13d75d7eae29f1ce42dbfe5c6b4878a822fdf91 Parents: 1a98088 Author: Siddharth Boobna <[email protected]> Authored: Tue Mar 15 21:03:33 2016 -0700 Committer: Sijie Guo <[email protected]> Committed: Tue Mar 15 21:03:33 2016 -0700 ---------------------------------------------------------------------- .../apache/bookkeeper/bookie/BookieShell.java | 123 +++++++++++++++---- .../bookkeeper/client/BookKeeperAdmin.java | 118 ++++++++++++++++++ .../apache/bookkeeper/client/LedgerHandle.java | 10 +- 3 files changed, 227 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a13d75d7/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index c7ed2f8..7d49a6a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -18,70 +18,69 @@ package org.apache.bookkeeper.bookie; +import static com.google.common.base.Charsets.UTF_8; + import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Formatter; -import java.util.HashMap; -import java.util.Map; -import java.util.List; import java.util.ArrayList; -import java.util.Iterator; +import java.util.Collection; import java.util.Collections; import java.util.Enumeration; -import java.util.Collection; +import java.util.Formatter; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.replication.AuditorElector; import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException; import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; import org.apache.bookkeeper.bookie.Journal.JournalScanner; import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeperAdmin; -import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.BookKeeperAdmin; +import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.LedgerMetadata; import org.apache.bookkeeper.client.UpdateLedgerOp; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; -import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; import org.apache.bookkeeper.meta.LedgerManager.LedgerRange; +import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.replication.AuditorElector; import org.apache.bookkeeper.util.EntryFormatter; import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.Tool; -import org.apache.bookkeeper.util.ZkUtils; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; -import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; - -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; - -import com.google.common.util.concurrent.AbstractFuture; -import static com.google.common.base.Charsets.UTF_8; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.MissingArgumentException; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.io.HexDump; +import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.commons.lang.StringUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.AbstractFuture; + /** * Bookie Shell is to provide utilities for users to administer a bookkeeper cluster. */ @@ -95,6 +94,7 @@ public class BookieShell implements Tool { static final String CMD_BOOKIEFORMAT = "bookieformat"; static final String CMD_RECOVER = "recover"; static final String CMD_LEDGER = "ledger"; + static final String CMD_READ_LEDGER_ENTRIES = "readledger"; static final String CMD_LISTLEDGERS = "listledgers"; static final String CMD_LEDGERMETADATA = "ledgermetadata"; static final String CMD_LISTUNDERREPLICATED = "listunderreplicated"; @@ -397,6 +397,86 @@ public class BookieShell implements Tool { } /** + * Command for reading ledger entries + */ + class ReadLedgerEntriesCmd extends MyCommand { + Options lOpts = new Options(); + + ReadLedgerEntriesCmd() { + super(CMD_READ_LEDGER_ENTRIES); + } + + @Override + Options getOptions() { + return lOpts; + } + + @Override + String getDescription() { + return "Read a range of entries from a ledger"; + } + + @Override + String getUsage() { + return "readledger <ledger_id> [<start_entry_id> [<end_entry_id>]]"; + } + + @Override + int runCmd(CommandLine cmdLine) throws Exception { + String[] leftArgs = cmdLine.getArgs(); + if (leftArgs.length <= 0) { + System.err.println("ERROR: missing ledger id"); + printUsage(); + return -1; + } + + long ledgerId; + long firstEntry = 0; + long lastEntry = -1; + try { + ledgerId = Long.parseLong(leftArgs[0]); + if (leftArgs.length >= 2) { + firstEntry = Long.parseLong(leftArgs[1]); + } + if (leftArgs.length >= 3) { + lastEntry = Long.parseLong(leftArgs[2]); + } + } catch (NumberFormatException nfe) { + System.err.println("ERROR: invalid number " + nfe.getMessage()); + printUsage(); + return -1; + } + + ClientConfiguration conf = new ClientConfiguration(); + conf.addConfiguration(bkConf); + + BookKeeperAdmin bk = null; + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + bk = new BookKeeperAdmin(conf); + Iterator<LedgerEntry> entries = bk.readEntries(ledgerId, firstEntry, lastEntry).iterator(); + while (entries.hasNext()) { + LedgerEntry entry = entries.next(); + HexDump.dump(entry.getEntry(), 0, out, 0); + System.out.println("Entry Id: " + entry.getEntryId() + ", Data: " + new String(out.toByteArray())); + out.reset(); + } + } catch (Exception e) { + LOG.error("Error reading entries from ledger {}", ledgerId, e.getCause()); + return -1; + } finally { + out.close(); + if (bk != null) { + bk.close(); + } + } + + return 0; + } + + } + + /** * Command for listing underreplicated ledgers */ class ListUnderreplicatedCmd extends MyCommand { @@ -1425,6 +1505,7 @@ public class BookieShell implements Tool { commands.put(CMD_BOOKIEFORMAT, new BookieFormatCmd()); commands.put(CMD_RECOVER, new RecoverCmd()); commands.put(CMD_LEDGER, new LedgerCmd()); + commands.put(CMD_READ_LEDGER_ENTRIES, new ReadLedgerEntriesCmd()); commands.put(CMD_LISTLEDGERS, new ListLedgersCmd()); commands.put(CMD_LISTUNDERREPLICATED, new ListUnderreplicatedCmd()); commands.put(CMD_WHOISAUDITOR, new WhoIsAuditorCmd()); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a13d75d7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java index ff339db..0bc5c45 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.client; import static com.google.common.base.Charsets.UTF_8; +import static com.google.common.base.Preconditions.checkArgument; import java.io.IOException; import java.net.UnknownHostException; @@ -310,6 +311,123 @@ public class BookKeeperAdmin { return counter.getLh(); } + /** + * Read entries from a ledger synchronously. If the lastEntry is -1, it will read all the entries in the ledger from + * the firstEntry. + * + * @param ledgerId + * @param firstEntry + * @param lastEntry + * @return + * @throws InterruptedException + * @throws BKException + */ + public Iterable<LedgerEntry> readEntries(long ledgerId, long firstEntry, long lastEntry) + throws InterruptedException, BKException { + checkArgument(ledgerId >= 0 && firstEntry >= 0); + return new LedgerEntriesIterable(ledgerId, firstEntry, lastEntry); + } + + class LedgerEntriesIterable implements Iterable<LedgerEntry> { + final long ledgerId; + final long firstEntryId; + final long lastEntryId; + + public LedgerEntriesIterable(long ledgerId, long firstEntry) { + this(ledgerId, firstEntry, -1); + } + + public LedgerEntriesIterable(long ledgerId, long firstEntry, long lastEntry) { + this.ledgerId = ledgerId; + this.firstEntryId = firstEntry; + this.lastEntryId = lastEntry; + } + + @Override + public Iterator<LedgerEntry> iterator() { + try { + return new LedgerEntriesIterator(ledgerId, firstEntryId, lastEntryId); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + class LedgerEntriesIterator implements Iterator<LedgerEntry> { + final LedgerHandle handle; + final long ledgerId; + final long lastEntryId; + + long nextEntryId; + LedgerEntry currentEntry; + + public LedgerEntriesIterator(long ledgerId, long firstEntry, long lastEntry) + throws InterruptedException, BKException { + this.handle = openLedgerNoRecovery(ledgerId); + this.ledgerId = ledgerId; + this.nextEntryId = firstEntry; + this.lastEntryId = lastEntry; + this.currentEntry = null; + } + + @Override + public boolean hasNext() { + if (currentEntry != null) { + return true; + } + if (lastEntryId == -1 || nextEntryId <= lastEntryId) { + try { + SyncCounter counter = new SyncCounter(); + counter.inc(); + + handle.asyncReadEntriesInternal(nextEntryId, nextEntryId, new LedgerHandle.SyncReadCallback(), + counter); + counter.block(0); + if (counter.getrc() != BKException.Code.OK) { + throw BKException.create(counter.getrc()); + } + currentEntry = counter.getSequence().nextElement(); + return true; + } catch (Exception e) { + if (e instanceof BKException.BKNoSuchEntryException && lastEntryId == -1) { + // there are no more entries in the ledger, so we just return false and ignore this exception + // since the last entry id was undefined + close(); + return false; + } + LOG.error("Error reading entry {} from ledger {}", new Object[] { nextEntryId, ledgerId }, e); + close(); + throw new RuntimeException(e); + } + } + close(); + return false; + } + + @Override + public LedgerEntry next() { + ++nextEntryId; + LedgerEntry entry = currentEntry; + currentEntry = null; + return entry; + } + + @Override + public void remove() { + // noop + } + + private void close() { + if (handle != null) { + try { + handle.close(); + } catch (Exception e) { + LOG.error("Error closing ledger handle {}", handle, e); + } + } + } + } + // Object used for calling async methods and waiting for them to complete. static class SyncObject { boolean value; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a13d75d7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 12d689c..9af2db7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -458,6 +458,10 @@ public class LedgerHandle { return; } + asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx); + } + + void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) { try { new PendingReadOp(this, bk.scheduler, firstEntry, lastEntry, cb, ctx).initiate(); @@ -1294,7 +1298,7 @@ public class LedgerHandle { } } - private static class SyncReadCallback implements ReadCallback { + static class SyncReadCallback implements ReadCallback { /** * Implementation of callback interface for synchronous read method. * @@ -1346,7 +1350,7 @@ public class LedgerHandle { } } - private static class SyncReadLastConfirmedCallback implements ReadLastConfirmedCallback { + static class SyncReadLastConfirmedCallback implements ReadLastConfirmedCallback { /** * Implementation of callback interface for synchronous read last confirmed method. */ @@ -1362,7 +1366,7 @@ public class LedgerHandle { } } - private static class SyncCloseCallback implements CloseCallback { + static class SyncCloseCallback implements CloseCallback { /** * Close callback method *
