This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 4487f74 [MERGE YAHOO REPO] CMS-1487: Provide tool to read entries
from a particular bookie
4487f74 is described below
commit 4487f7447ab90a8a1b80cacb5f77c8f6760bb056
Author: Jia Zhai <[email protected]>
AuthorDate: Fri Jan 19 01:10:16 2018 -0800
[MERGE YAHOO REPO] CMS-1487: Provide tool to read entries from a particular
bookie
Descriptions of the changes in this PR:
CMS-1487: Provide tool to read entries from a particular bookie.
Here is original commit:
https://github.com/yahoo/bookkeeper/commit/1a884b02
Author: Jia Zhai <[email protected]>
Author: Matteo Merli <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #1009 from jiazhai/cherry_picks/i_115
---
.../org/apache/bookkeeper/bookie/BookieShell.java | 90 ++++++++++++++++++----
1 file changed, 74 insertions(+), 16 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index fcd6329..5235110 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -26,7 +26,11 @@ import com.google.common.collect.Maps;
import com.google.common.net.InetAddresses;
import com.google.common.util.concurrent.AbstractFuture;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -56,8 +60,12 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import java.util.stream.LongStream;
import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -85,6 +93,7 @@ import
org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.replication.AuditorElector;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -94,6 +103,7 @@ import org.apache.bookkeeper.util.EntryFormatter;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.LedgerIdFormatter;
import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.Tool;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
@@ -570,7 +580,9 @@ public class BookieShell implements Tool {
lOpts.addOption("l", "ledgerid", true, "Ledger ID");
lOpts.addOption("fe", "firstentryid", true, "First EntryID");
lOpts.addOption("le", "lastentryid", true, "Last EntryID");
- lOpts.addOption("r", "force-recovery", false, "Ensure the ledger
is properly closed before reading");
+ lOpts.addOption("r", "force-recovery", false,
+ "Ensure the ledger is properly closed before reading");
+ lOpts.addOption("b", "bookie", true, "Only read from a specific
bookie");
}
@Override
@@ -585,7 +597,7 @@ public class BookieShell implements Tool {
@Override
String getUsage() {
- return "readledger [-msg] -ledgerid <ledgerid> "
+ return "readledger [-bookie <address:port>] [-msg] -ledgerid
<ledgerid> "
+ "[-firstentryid <firstentryid> [-lastentryid
<lastentryid>]] "
+ "[-force-recovery]";
}
@@ -603,13 +615,18 @@ public class BookieShell implements Tool {
boolean printMsg = cmdLine.hasOption("m");
boolean forceRecovery = cmdLine.hasOption("r");
+ final BookieSocketAddress bookie;
+ if (cmdLine.hasOption("b")) {
+ // A particular bookie was specified
+ bookie = new BookieSocketAddress(cmdLine.getOptionValue("b"));
+ } else {
+ bookie = null;
+ }
ClientConfiguration conf = new ClientConfiguration();
conf.addConfiguration(bkConf);
- BookKeeperAdmin bk = null;
- try {
- bk = new BookKeeperAdmin(conf);
+ try (BookKeeperAdmin bk = new BookKeeperAdmin(conf)) {
if (forceRecovery) {
// Force the opening of the ledger to trigger recovery
try (LedgerHandle lh = bk.openLedger(ledgerId)) {
@@ -619,17 +636,58 @@ public class BookieShell implements Tool {
}
}
- Iterator<LedgerEntry> entries = bk.readEntries(ledgerId,
firstEntry, lastEntry).iterator();
- while (entries.hasNext()) {
- LedgerEntry entry = entries.next();
- formatEntry(entry, printMsg);
- }
- } catch (Exception e) {
- LOG.error("Error reading entries from ledger {}", ledgerId, e);
- return -1;
- } finally {
- if (bk != null) {
- bk.close();
+ if (bookie == null) {
+ // No bookie was specified, use normal bk client
+ Iterator<LedgerEntry> entries = bk.readEntries(ledgerId,
firstEntry, lastEntry).iterator();
+ while (entries.hasNext()) {
+ LedgerEntry entry = entries.next();
+ formatEntry(entry, printMsg);
+ }
+ } else {
+ // Use BookieClient to target a specific bookie
+ EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+ OrderedSafeExecutor executor =
OrderedSafeExecutor.newBuilder()
+ .numThreads(1)
+ .name("BookieClientScheduler")
+ .build();
+
+ ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
+ new
DefaultThreadFactory("BookKeeperClientSchedulerPool"));
+
+ BookieClient bookieClient = new BookieClient(conf,
eventLoopGroup, executor,
+ scheduler, NullStatsLogger.INSTANCE);
+
+ LongStream.range(firstEntry, lastEntry).forEach(entryId ->
{
+ CompletableFuture<Void> future = new
CompletableFuture<>();
+
+ bookieClient.readEntry(bookie, ledgerId, entryId,
+ (rc, ledgerId1, entryId1, buffer, ctx) -> {
+ if (rc != BKException.Code.OK) {
+ LOG.error("Failed to read entry {} -- {}",
entryId1, BKException.getMessage(rc));
+
future.completeExceptionally(BKException.create(rc));
+ return;
+ }
+
+ System.out.println("--------- Lid=" +
ledgerIdFormatter.formatLedgerId(ledgerId)
+ + ", Eid=" + entryId + " ---------");
+ if (printMsg) {
+ System.out.println("Data: " +
ByteBufUtil.prettyHexDump(buffer));
+ }
+
+ buffer.release();
+ future.complete(null);
+ }, null);
+
+ try {
+ future.get();
+ } catch (Exception e) {
+ LOG.error("Error future.get while reading entries
from ledger {}", ledgerId, e);
+ }
+ });
+
+ eventLoopGroup.shutdownGracefully();
+ executor.shutdown();
+ bookieClient.close();
}
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].