This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 4487f74  [MERGE YAHOO REPO] CMS-1487: Provide tool to read entries 
from a particular bookie
4487f74 is described below

commit 4487f7447ab90a8a1b80cacb5f77c8f6760bb056
Author: Jia Zhai <[email protected]>
AuthorDate: Fri Jan 19 01:10:16 2018 -0800

    [MERGE YAHOO REPO] CMS-1487: Provide tool to read entries from a particular 
bookie
    
    Descriptions of the changes in this PR:
    
    CMS-1487: Provide tool to read entries from a particular bookie.
    Here is original commit:
    https://github.com/yahoo/bookkeeper/commit/1a884b02
    
    Author: Jia Zhai <[email protected]>
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #1009 from jiazhai/cherry_picks/i_115
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  | 90 ++++++++++++++++++----
 1 file changed, 74 insertions(+), 16 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index fcd6329..5235110 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -26,7 +26,11 @@ import com.google.common.collect.Maps;
 import com.google.common.net.InetAddresses;
 import com.google.common.util.concurrent.AbstractFuture;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -56,8 +60,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
+import java.util.stream.LongStream;
 import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
 import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -85,6 +93,7 @@ import 
org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.replication.AuditorElector;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -94,6 +103,7 @@ import org.apache.bookkeeper.util.EntryFormatter;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.LedgerIdFormatter;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.Tool;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
@@ -570,7 +580,9 @@ public class BookieShell implements Tool {
             lOpts.addOption("l", "ledgerid", true, "Ledger ID");
             lOpts.addOption("fe", "firstentryid", true, "First EntryID");
             lOpts.addOption("le", "lastentryid", true, "Last EntryID");
-            lOpts.addOption("r", "force-recovery", false, "Ensure the ledger 
is properly closed before reading");
+            lOpts.addOption("r", "force-recovery", false,
+                "Ensure the ledger is properly closed before reading");
+            lOpts.addOption("b", "bookie", true, "Only read from a specific 
bookie");
         }
 
         @Override
@@ -585,7 +597,7 @@ public class BookieShell implements Tool {
 
         @Override
         String getUsage() {
-            return "readledger   [-msg] -ledgerid <ledgerid> "
+            return "readledger  [-bookie <address:port>]  [-msg] -ledgerid 
<ledgerid> "
                     + "[-firstentryid <firstentryid> [-lastentryid 
<lastentryid>]] "
                     + "[-force-recovery]";
         }
@@ -603,13 +615,18 @@ public class BookieShell implements Tool {
 
             boolean printMsg = cmdLine.hasOption("m");
             boolean forceRecovery = cmdLine.hasOption("r");
+            final BookieSocketAddress bookie;
+            if (cmdLine.hasOption("b")) {
+                // A particular bookie was specified
+                bookie = new BookieSocketAddress(cmdLine.getOptionValue("b"));
+            } else {
+                bookie = null;
+            }
 
             ClientConfiguration conf = new ClientConfiguration();
             conf.addConfiguration(bkConf);
 
-            BookKeeperAdmin bk = null;
-            try {
-                bk = new BookKeeperAdmin(conf);
+            try (BookKeeperAdmin bk = new BookKeeperAdmin(conf)) {
                 if (forceRecovery) {
                     // Force the opening of the ledger to trigger recovery
                     try (LedgerHandle lh = bk.openLedger(ledgerId)) {
@@ -619,17 +636,58 @@ public class BookieShell implements Tool {
                     }
                 }
 
-                Iterator<LedgerEntry> entries = bk.readEntries(ledgerId, 
firstEntry, lastEntry).iterator();
-                while (entries.hasNext()) {
-                    LedgerEntry entry = entries.next();
-                    formatEntry(entry, printMsg);
-                }
-            } catch (Exception e) {
-                LOG.error("Error reading entries from ledger {}", ledgerId, e);
-                return -1;
-            } finally {
-                if (bk != null) {
-                    bk.close();
+                if (bookie == null) {
+                    // No bookie was specified, use normal bk client
+                    Iterator<LedgerEntry> entries = bk.readEntries(ledgerId, 
firstEntry, lastEntry).iterator();
+                    while (entries.hasNext()) {
+                        LedgerEntry entry = entries.next();
+                        formatEntry(entry, printMsg);
+                    }
+                } else {
+                    // Use BookieClient to target a specific bookie
+                    EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+                    OrderedSafeExecutor executor = 
OrderedSafeExecutor.newBuilder()
+                        .numThreads(1)
+                        .name("BookieClientScheduler")
+                        .build();
+
+                    ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor(
+                        new 
DefaultThreadFactory("BookKeeperClientSchedulerPool"));
+
+                    BookieClient bookieClient = new BookieClient(conf, 
eventLoopGroup, executor,
+                        scheduler, NullStatsLogger.INSTANCE);
+
+                    LongStream.range(firstEntry, lastEntry).forEach(entryId -> 
{
+                        CompletableFuture<Void> future = new 
CompletableFuture<>();
+
+                        bookieClient.readEntry(bookie, ledgerId, entryId,
+                            (rc, ledgerId1, entryId1, buffer, ctx) -> {
+                                if (rc != BKException.Code.OK) {
+                                    LOG.error("Failed to read entry {} -- {}", 
entryId1, BKException.getMessage(rc));
+                                    
future.completeExceptionally(BKException.create(rc));
+                                    return;
+                                }
+
+                                System.out.println("--------- Lid=" + 
ledgerIdFormatter.formatLedgerId(ledgerId)
+                                    + ", Eid=" + entryId + " ---------");
+                                if (printMsg) {
+                                    System.out.println("Data: " + 
ByteBufUtil.prettyHexDump(buffer));
+                                }
+
+                                buffer.release();
+                                future.complete(null);
+                            }, null);
+
+                        try {
+                            future.get();
+                        } catch (Exception e) {
+                            LOG.error("Error future.get while reading entries 
from ledger {}", ledgerId, e);
+                        }
+                    });
+
+                    eventLoopGroup.shutdownGracefully();
+                    executor.shutdown();
+                    bookieClient.close();
                 }
             }
 

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to