sijie closed pull request #1057: [Merge Yahoo repo]: Support DbLedgerStorage in 
LedgerCmd to get list of logger files for a given ledgerId 
URL: https://github.com/apache/bookkeeper/pull/1057
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 62b4992b9..1070a1686 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -674,12 +674,26 @@ public int runCmd(CommandLine cmdLine) throws Exception {
                 printUsage();
                 return -1;
             }
-            if (printMeta) {
-                // print meta
-                readLedgerMeta(ledgerId);
+
+            if 
(bkConf.getLedgerStorageClass().equals(DbLedgerStorage.class.getName())) {
+                // dump ledger info
+                try {
+                    DbLedgerStorage.readLedgerIndexEntries(ledgerId, bkConf,
+                            (currentEntry, entryLogId, position) -> 
System.out.println(
+                                    "entry " + currentEntry + "\t:\t(log: " + 
entryLogId + ", pos: " + position + ")"));
+                } catch (IOException e) {
+                    System.err.printf("ERROR: initializing dbLedgerStorage 
%s", e.getMessage());
+                    return -1;
+                }
+            } else {
+                if (printMeta) {
+                    // print meta
+                    readLedgerMeta(ledgerId);
+                }
+                // dump ledger info
+                readLedgerIndexEntries(ledgerId);
             }
-            // dump ledger info
-            readLedgerIndexEntries(ledgerId);
+
             return 0;
         }
 
@@ -3001,7 +3015,7 @@ protected void readLedgerMeta(long ledgerId) throws 
Exception {
     }
 
     /**
-     * Read ledger index entires.
+     * Read ledger index entries.
      *
      * @param ledgerId Ledger Id
      * @throws IOException
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index cb295d5cf..ec7a3b7f5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -21,14 +21,13 @@
 package org.apache.bookkeeper.bookie.storage.ldb;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.DefaultThreadFactory;
-
 import java.io.IOException;
 import java.util.SortedMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -40,7 +39,6 @@
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
 import org.apache.bookkeeper.bookie.BookieException;
@@ -56,13 +54,16 @@
 import org.apache.bookkeeper.bookie.StateManager;
 import 
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import 
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -773,7 +774,6 @@ public long addLedgerToIndex(long ledgerId, boolean 
isFenced, byte[] masterKey,
 
         return numberOfEntries.get();
     }
-
     @Override
     public void registerLedgerDeletionListener(LedgerDeletionListener 
listener) {
         ledgerDeletionListeners.add(listener);
@@ -791,5 +791,50 @@ private void recordFailedEvent(OpStatsLogger logger, long 
startTimeNanos) {
         logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
     }
 
+    /**
+     * Reads ledger index entries to get list of entry-logger that contains 
given ledgerId.
+     *
+     * @param ledgerId
+     * @param serverConf
+     * @param processor
+     * @throws IOException
+     */
+    public static void readLedgerIndexEntries(long ledgerId, 
ServerConfiguration serverConf,
+            LedgerLoggerProcessor processor) throws IOException {
+
+        checkNotNull(serverConf, "ServerConfiguration can't be null");
+        checkNotNull(processor, "LedgerLoggger info processor can't null");
+
+        LedgerDirsManager ledgerDirsManager = new 
LedgerDirsManager(serverConf, serverConf.getLedgerDirs(),
+            new DiskChecker(serverConf.getDiskUsageThreshold(), 
serverConf.getDiskUsageWarnThreshold()));
+        String ledgerBasePath = 
ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+
+        EntryLocationIndex entryLocationIndex = new 
EntryLocationIndex(serverConf,
+                (path, dbConfigType, conf1) -> new 
KeyValueStorageRocksDB(path, DbConfigType.Small, conf1, true),
+                ledgerBasePath, NullStatsLogger.INSTANCE);
+        try {
+            long lastEntryId = 
entryLocationIndex.getLastEntryInLedger(ledgerId);
+            for (long currentEntry = 0; currentEntry <= lastEntryId; 
currentEntry++) {
+                long offset = entryLocationIndex.getLocation(ledgerId, 
currentEntry);
+                if (offset <= 0) {
+                    // entry not found in this bookie
+                    continue;
+                }
+                long entryLogId = offset >> 32L;
+                long position = offset & 0xffffffffL;
+                processor.process(currentEntry, entryLogId, position);
+            }
+        } finally {
+            entryLocationIndex.close();
+        }
+    }
+
+    /**
+     * Interface which process ledger logger.
+     */
+    public interface LedgerLoggerProcessor {
+        void process(long entryId, long entryLogId, long position);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(DbLedgerStorage.class);
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java
new file mode 100644
index 000000000..405591793
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static junit.framework.TestCase.assertEquals;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.bookie.BookieAccessor;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.EntryFormatter;
+import org.apache.bookkeeper.util.LedgerIdFormatter;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A test for bookieshell ledger command.
+ */
+public class LedgerCmdTest extends BookKeeperClusterTestCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(LedgerCmdTest.class);
+    private DigestType digestType = DigestType.CRC32;
+    private static final String PASSWORD = "testPasswd";
+
+    public LedgerCmdTest() {
+        super(1);
+        baseConf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        baseConf.setGcWaitTime(60000);
+        baseConf.setFlushInterval(1);
+    }
+
+
+    /**
+     * list of entry logger files that contains given ledgerId.
+     */
+    @Test
+    public void testLedgerDbStorageCmd() throws Exception {
+
+        BookKeeper bk = new BookKeeper(baseClientConf, zkc);
+        LOG.info("Create ledger and add entries to it");
+        LedgerHandle lh1 = createLedgerWithEntries(bk, 10);
+
+        bs.forEach(bookieServer -> {
+            try {
+                BookieAccessor.forceFlush(bookieServer.getBookie());
+            } catch (IOException e) {
+                LOG.error("Error forceFlush:", e);
+            }
+        });
+
+        String[] argv = { "ledger", Long.toString(lh1.getId()) };
+        final ServerConfiguration conf = bsConfs.get(0);
+        conf.setUseHostNameAsBookieID(true);
+
+        BookieShell bkShell =
+            new BookieShell(LedgerIdFormatter.LONG_LEDGERID_FORMATTER, 
EntryFormatter.STRING_FORMATTER);
+        bkShell.setConf(conf);
+
+        assertEquals("Failed to return exit code!", 0, bkShell.run(argv));
+
+    }
+
+    private LedgerHandle createLedgerWithEntries(BookKeeper bk, int 
numOfEntries) throws Exception {
+        LedgerHandle lh = bk.createLedger(1, 1, digestType, 
PASSWORD.getBytes());
+        final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
+        final CountDownLatch latch = new CountDownLatch(numOfEntries);
+
+        final AddCallback cb = new AddCallback() {
+            public void addComplete(int rccb, LedgerHandle lh, long entryId, 
Object ctx) {
+                rc.compareAndSet(BKException.Code.OK, rccb);
+                latch.countDown();
+            }
+        };
+        for (int i = 0; i < numOfEntries; i++) {
+            lh.asyncAddEntry(("foobar" + i).getBytes(), cb, null);
+        }
+        if (!latch.await(30, TimeUnit.SECONDS)) {
+            throw new Exception("Entries took too long to add");
+        }
+        if (rc.get() != BKException.Code.OK) {
+            throw BKException.create(rc.get());
+        }
+        return lh;
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to