This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new b088df620a [stats][bookie] Add bookie sanity test metrics (#3924)
b088df620a is described below

commit b088df620a154ca0f01d729212686ed6852d157d
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed Apr 19 18:31:10 2023 -0700

    [stats][bookie] Add bookie sanity test metrics (#3924)
    
    ### Motivation
    
    It solves #3923
    
    Right now, Bookie metrics (Prometheus) don't show if Bookie's sanity is 
passing or not. and with container-based Prometheus bookie metrics which are 
used for monitoring, it will be really useful if metrics also show the bookie 
sanity state so, Prometheus can automatically scrape the bookie sanity state 
and one can set up monitoring on it.
    Therefore, add support to include the bookie sanity state in the bookie 
metrics.
    For example:
    ```
    URL: http://localhost:8000/metrics
    :
    # 1 shows bookie sanity is passing, 0 failed, -1 unknown
    bookie_SERVER_SANITY 1
    ```
    
    ### Changes
    
    - Add periodic task which performs sanity command on bookie and captures 
bookie sanity test periodically
    - Reuse the `SanityTestCommnad` for this purpose but SanityTestCommand has 
lot of blocking IO which can cause thread blocking for metrics collection.
    - Therefore, implement async method into `SanityTestCommnad` to avoid 
thread blocking and use it in metrics collection.
    
    Master Issue: #3923
---
 .../bookkeeper/bookie/BookKeeperServerStats.java   |   1 +
 .../bookkeeper/bookie/BookieStateManager.java      |  39 +++++-
 .../bookkeeper/conf/ServerConfiguration.java       |  23 ++++
 .../cli/commands/bookie/SanityTestCommand.java     | 150 +++++++++++++++------
 conf/bk_server.conf                                |   3 +
 site3/website/docs/admin/metrics.md                |   1 +
 site3/website/docs/reference/config.md             |   3 +-
 .../cli/commands/bookie/SanityTestCommandTest.java |  56 ++++++--
 8 files changed, 224 insertions(+), 52 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index d4657d2036..23ebec3f2b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -31,6 +31,7 @@ public interface BookKeeperServerStats {
     String BOOKIE_SCOPE = "bookie";
 
     String SERVER_STATUS = "SERVER_STATUS";
+    String SERVER_SANITY = "SERVER_SANITY";
 
     //
     // Network Stats (scoped under SERVER_SCOPE)
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
index 80226bd38e..fbe7336883 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.bookie;
 
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SANITY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_STATUS;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -33,10 +34,12 @@ import java.io.UncheckedIOException;
 import java.net.UnknownHostException;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -47,10 +50,12 @@ import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
+import org.apache.bookkeeper.tools.cli.commands.bookie.SanityTestCommand;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * An implementation of StateManager.
  */
@@ -67,7 +72,7 @@ public class BookieStateManager implements StateManager {
     private final List<File> statusDirs;
 
     // use an executor to execute the state changes task
-    final ExecutorService stateService = Executors.newSingleThreadExecutor(
+    final ScheduledExecutorService stateService = 
Executors.newScheduledThreadPool(1,
             new 
ThreadFactoryBuilder().setNameFormat("BookieStateManagerService-%d").build());
 
     // Running flag
@@ -78,6 +83,7 @@ public class BookieStateManager implements StateManager {
     private final BookieStatus bookieStatus = new BookieStatus();
     private final AtomicBoolean rmRegistered = new AtomicBoolean(false);
     private final AtomicBoolean forceReadOnly = new AtomicBoolean(false);
+    private final AtomicInteger sanityPassed = new AtomicInteger(-1);
     private volatile boolean availableForHighPriorityWrites = true;
 
     private final Supplier<BookieId> bookieIdSupplier;
@@ -89,6 +95,11 @@ public class BookieStateManager implements StateManager {
         help = "Bookie status (1: up, 0: readonly, -1: unregistered)"
     )
     private final Gauge<Number> serverStatusGauge;
+    @StatsDoc(
+        name = SERVER_SANITY,
+        help = "Bookie sanity (1: up, 0: down, -1: unknown)"
+    )
+    private final Gauge<Number> serverSanityGauge;
 
     public BookieStateManager(ServerConfiguration conf,
                               StatsLogger statsLogger,
@@ -149,6 +160,30 @@ public class BookieStateManager implements StateManager {
             }
         };
         statsLogger.registerGauge(SERVER_STATUS, serverStatusGauge);
+        this.serverSanityGauge = new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return -1;
+            }
+
+            @Override
+            public Number getSample() {
+                return sanityPassed.get();
+            }
+        };
+        statsLogger.registerGauge(SERVER_SANITY, serverSanityGauge);
+        stateService.scheduleAtFixedRate(() -> {
+            if (isReadOnly()) {
+                sanityPassed.set(1);
+                return;
+            }
+            SanityTestCommand.handleAsync(conf, new 
SanityTestCommand.SanityFlags()).thenAccept(__ -> {
+                sanityPassed.set(1);
+            }).exceptionally(ex -> {
+                sanityPassed.set(0);
+                return null;
+            });
+        }, 60, 60, TimeUnit.SECONDS);
     }
 
     private boolean isRegistrationManagerDisabled() {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index cf74f6af01..3b58610848 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -251,6 +251,7 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     // Statistics Parameters
     protected static final String ENABLE_STATISTICS = "enableStatistics";
     protected static final String STATS_PROVIDER_CLASS = "statsProviderClass";
+    protected static final String SANITY_CHECK_METRICS_ENABLED = 
"sanityCheckMetricsEnabled";
 
 
     // Rx adaptive ByteBuf allocator parameters
@@ -3143,6 +3144,28 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
         return this;
     }
 
+
+    /**
+     * Flag to enable sanity check metrics in bookie stats. Defaults to 
false/disabled.
+     *
+     * @return true, if bookie collects sanity check metrics in stats
+     */
+    public boolean isSanityCheckMetricsEnabled() {
+        return getBoolean(SANITY_CHECK_METRICS_ENABLED, false);
+    }
+
+    /**
+     * Enable sanity check metrics in bookie stats.
+     *
+     * @param sanityCheckMetricsEnabled
+     *          flag to enable sanity check metrics
+     * @return server configuration
+     */
+    public ServerConfiguration setSanityCheckMetricsEnabled(boolean 
sanityCheckMetricsEnabled) {
+        setProperty(SANITY_CHECK_METRICS_ENABLED, sanityCheckMetricsEnabled);
+        return this;
+    }
+
     /**
      * Validate the configuration.
      * @throws ConfigurationException
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/SanityTestCommand.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/SanityTestCommand.java
index c1a81fd743..9d7f3ccd27 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/SanityTestCommand.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/SanityTestCommand.java
@@ -22,13 +22,18 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 import com.beust.jcommander.Parameter;
 import com.google.common.util.concurrent.UncheckedExecutionException;
-import java.util.Enumeration;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import lombok.Setter;
 import lombok.experimental.Accessors;
 import org.apache.bookkeeper.bookie.LocalBookieEnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import 
org.apache.bookkeeper.tools.cli.commands.bookie.SanityTestCommand.SanityFlags;
@@ -81,59 +86,126 @@ public class SanityTestCommand extends 
BookieCommand<SanityFlags> {
         }
     }
 
-    private boolean handle(ServerConfiguration conf, SanityFlags cmdFlags) 
throws Exception {
+    private static boolean handle(ServerConfiguration conf, SanityFlags 
cmdFlags) throws Exception {
+        try {
+            return handleAsync(conf, cmdFlags).get();
+        } catch (Exception e) {
+            LOG.warn("Error in bookie sanity test", e);
+            return false;
+        }
+    }
+
+    public static CompletableFuture<Boolean> handleAsync(ServerConfiguration 
conf, SanityFlags cmdFlags) {
+        CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
         ClientConfiguration clientConf = new ClientConfiguration();
         clientConf.addConfiguration(conf);
         
clientConf.setEnsemblePlacementPolicy(LocalBookieEnsemblePlacementPolicy.class);
         clientConf.setAddEntryTimeout(cmdFlags.timeout);
         clientConf.setReadEntryTimeout(cmdFlags.timeout);
 
-        BookKeeper bk = new BookKeeper(clientConf);
-        LedgerHandle lh = null;
+        BookKeeper bk;
         try {
-            lh = bk.createLedger(1, 1, BookKeeper.DigestType.MAC, new byte[0]);
-            LOG.info("Create ledger {}", lh.getId());
+            bk = new BookKeeper(clientConf);
+        } catch (BKException | IOException | InterruptedException e) {
+            LOG.warn("Failed to initialize bookkeeper client", e);
+            result.completeExceptionally(e);
+            return result;
+        }
 
+        bk.asyncCreateLedger(1, 1, BookKeeper.DigestType.MAC, new byte[0], 
(rc, lh, ctx) -> {
+            if (rc != BKException.Code.OK) {
+                LOG.warn("ledger creation failed for sanity command {}", rc);
+                result.completeExceptionally(BKException.create(rc));
+                return;
+            }
+            List<CompletableFuture<Void>> entriesFutures = new ArrayList<>();
             for (int i = 0; i < cmdFlags.entries; i++) {
                 String content = "entry-" + i;
-                lh.addEntry(content.getBytes(UTF_8));
-            }
-
-            LOG.info("Written {} entries in ledger {}", cmdFlags.entries, 
lh.getId());
-
-            // Reopen the ledger and read entries
-            lh = bk.openLedger(lh.getId(), BookKeeper.DigestType.MAC, new 
byte[0]);
-            if (lh.getLastAddConfirmed() != (cmdFlags.entries - 1)) {
-                throw new Exception("Invalid last entry found on ledger. 
expecting: " + (cmdFlags.entries - 1)
-                                        + " -- found: " + 
lh.getLastAddConfirmed());
+                CompletableFuture<Void> entryFuture = new 
CompletableFuture<>();
+                entriesFutures.add(entryFuture);
+                lh.asyncAddEntry(content.getBytes(UTF_8), (arc, alh, entryId, 
actx) -> {
+                    if (arc != BKException.Code.OK) {
+                        LOG.warn("ledger add entry failed for {}-{}", 
alh.getId(), arc);
+                        
entryFuture.completeExceptionally(BKException.create(arc));
+                        return;
+                    }
+                    entryFuture.complete(null);
+                }, null);
             }
+            CompletableFuture<LedgerHandle> lhFuture = new 
CompletableFuture<>();
+            CompletableFuture<Void> readEntryFuture = new 
CompletableFuture<>();
+
+            FutureUtils.collect(entriesFutures).thenCompose(_r -> {
+                bk.asyncOpenLedger(lh.getId(), BookKeeper.DigestType.MAC, new 
byte[0], (orc, olh, octx) -> {
+                    if (orc != BKException.Code.OK) {
+                        LOG.warn("open sanity ledger failed for {}-{}", 
lh.getId(), orc);
+                        
lhFuture.completeExceptionally(BKException.create(orc));
+                        return;
+                    }
+                    long lac = olh.getLastAddConfirmed();
+                    if (lac != (cmdFlags.entries - 1)) {
+                        lhFuture.completeExceptionally(new Exception("Invalid 
last entry found on ledger. expecting: "
+                                + (cmdFlags.entries - 1) + " -- found: " + 
lac));
+                        return;
+                    }
+                    lhFuture.complete(lh);
+                }, null);
+                return lhFuture;
+            }).thenCompose(rlh -> {
+                rlh.asyncReadEntries(0, cmdFlags.entries - 1, (rrc, rlh2, 
entries, rctx) -> {
+                    if (rrc != BKException.Code.OK) {
+                        LOG.warn("reading sanity ledger failed for {}-{}", 
lh.getId(), rrc);
+                        
readEntryFuture.completeExceptionally(BKException.create(rrc));
+                        return;
+                    }
+                    int i = 0;
+                    while (entries.hasMoreElements()) {
+                        LedgerEntry entry = entries.nextElement();
+                        String actualMsg = new String(entry.getEntry(), UTF_8);
+                        String expectedMsg = "entry-" + (i++);
+                        if (!expectedMsg.equals(actualMsg)) {
+                            readEntryFuture.completeExceptionally(
+                                    new Exception("Failed validation of 
received message - Expected: " + expectedMsg
+                                            + ", Actual: " + actualMsg));
+                            return;
+                        }
+                    }
+                    LOG.info("Read {} entries from ledger {}", i, lh.getId());
+                    LOG.info("Bookie sanity test succeeded");
+                    readEntryFuture.complete(null);
+                }, null);
+                return readEntryFuture;
+            }).thenAccept(_r -> {
+                close(bk, lh);
+                result.complete(true);
+            }).exceptionally(ex -> {
+                close(bk, lh);
+                result.completeExceptionally(ex.getCause());
+                return null;
+            });
+        }, null);
+        return result;
+    }
 
-            Enumeration<LedgerEntry> entries = lh.readEntries(0, 
cmdFlags.entries - 1);
-            int i = 0;
-            while (entries.hasMoreElements()) {
-                LedgerEntry entry = entries.nextElement();
-                String actualMsg = new String(entry.getEntry(), UTF_8);
-                String expectedMsg = "entry-" + (i++);
-                if (!expectedMsg.equals(actualMsg)) {
-                    throw new Exception("Failed validation of received message 
- Expected: " + expectedMsg
-                                            + ", Actual: " + actualMsg);
+    public static void close(BookKeeper bk, LedgerHandle lh) {
+        if (lh != null) {
+            bk.asyncDeleteLedger(lh.getId(), (rc, ctx) -> {
+                if (rc != BKException.Code.OK) {
+                    LOG.info("Failed to delete ledger id {}", lh.getId());
                 }
-            }
-
-            LOG.info("Read {} entries from ledger {}", i, lh.getId());
-        } catch (Exception e) {
-            LOG.warn("Error in bookie sanity test", e);
-            return false;
-        } finally {
-            if (lh != null) {
-                bk.deleteLedger(lh.getId());
-                LOG.info("Deleted ledger {}", lh.getId());
-            }
+                close(bk);
+            }, null);
+        } else {
+            close(bk);
+        }
+    }
 
+    private static void close(BookKeeper bk) {
+        try {
             bk.close();
+        } catch (Exception e) {
+            LOG.info("Failed to close bookkeeper client {}", e.getMessage(), 
e);
         }
-
-        LOG.info("Bookie sanity test succeeded");
-        return true;
     }
+
 }
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index daccbb7f9e..18c553fba0 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -882,6 +882,9 @@ zkEnableSecurity=false
 # Whether statistics are enabled
 # enableStatistics=true
 
+# Flag to enable sanity check metrics in bookie stats
+# sanityCheckMetricsEnabled=false
+
 # The flag to enable recording task execution stats.
 # enableTaskExecutionStats=false
 
diff --git a/site3/website/docs/admin/metrics.md 
b/site3/website/docs/admin/metrics.md
index eeb7cc6772..b52d4499ee 100644
--- a/site3/website/docs/admin/metrics.md
+++ b/site3/website/docs/admin/metrics.md
@@ -23,6 +23,7 @@ Two stats-related [configuration 
parameters](../reference/config/) are available
 Parameter | Description | Default
 :---------|:------------|:-------
 `enableStatistics` | Whether statistics are enabled for the bookie | `false`
+`sanityCheckMetricsEnabled` | Flag to enable sanity check metrics in bookie 
stats | `false`
 `statsProviderClass` | The stats provider class used by the bookie | 
`org.apache.bookkeeper.stats.CodahaleMetricsProvider`
 
 
diff --git a/site3/website/docs/reference/config.md 
b/site3/website/docs/reference/config.md
index 77db2bf1a6..11953b80b3 100644
--- a/site3/website/docs/reference/config.md
+++ b/site3/website/docs/reference/config.md
@@ -276,7 +276,8 @@ The table below lists parameters that you can set to 
configure bookies. All conf
 
 | Parameter | Description | Default
 | --------- | ----------- | ------- | 
-| enableStatistics | Whether statistics are enabled for the bookie. | true | 
+| enableStatistics | Whether statistics are enabled for the bookie. | true |
+| sanityCheckMetricsEnabled | Flag to enable sanity check metrics in bookie 
stats. | false |
 | statsProviderClass | Stats provider class.<br />Options:<br /> - Prometheus  
  : org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider<br /> - 
Codahale     : org.apache.bookkeeper.stats.codahale.CodahaleMetricsProvider<br 
/> - Twitter Finagle  : 
org.apache.bookkeeper.stats.twitter.finagle.FinagleStatsProvider<br /> - 
Twitter Ostrich  : 
org.apache.bookkeeper.stats.twitter.ostrich.OstrichProvider<br /> - Twitter 
Science  : org.apache.bookkeeper.stats.twitter.science.TwitterSta [...]
 | limitStatsLogging | option to limit stats logging | true | 
 
diff --git 
a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/SanityTestCommandTest.java
 
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/SanityTestCommandTest.java
index 4b1f0c9156..a0a0d33a5b 100644
--- 
a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/SanityTestCommandTest.java
+++ 
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/SanityTestCommandTest.java
@@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -33,6 +34,12 @@ import 
com.google.common.util.concurrent.UncheckedExecutionException;
 import java.util.Enumeration;
 import java.util.Vector;
 import org.apache.bookkeeper.bookie.LocalBookieEnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
+import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -40,6 +47,8 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase;
 import org.apache.commons.configuration.Configuration;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /**
  * Test for sanity command.
@@ -59,15 +68,42 @@ public class SanityTestCommandTest extends 
BookieCommandTestBase {
         lh = mock(LedgerHandle.class);
         mockClientConfigurationConstruction();
         mockConstruction(BookKeeper.class, (bk, context) -> {
-            when(bk.createLedger(anyInt(), anyInt(), 
any(BookKeeper.DigestType.class), eq(new byte[0]))).thenReturn(lh);
-            when(bk.openLedger(anyLong(), any(BookKeeper.DigestType.class), 
eq(new byte[0]))).thenReturn(lh);
+            doAnswer(new Answer<Void>() {
+                public Void answer(InvocationOnMock invocation) {
+                    ((CreateCallback) 
invocation.getArguments()[4]).createComplete(BKException.Code.OK, lh,
+                            null);
+                    return null;
+                }
+            }).when(bk).asyncCreateLedger(anyInt(), anyInt(), 
any(BookKeeper.DigestType.class), eq(new byte[0]),
+                    any(CreateCallback.class), any());
+            doAnswer(new Answer<Void>() {
+                public Void answer(InvocationOnMock invocation) {
+                    ((OpenCallback) 
invocation.getArguments()[3]).openComplete(BKException.Code.OK, lh,
+                            null);
+                    return null;
+                }
+            }).when(bk).asyncOpenLedger(anyLong(), 
any(BookKeeper.DigestType.class), eq(new byte[0]),
+                    any(OpenCallback.class), any());
         });
 
         when(lh.getLastAddConfirmed()).thenReturn(9L);
         Enumeration<LedgerEntry> entryEnumeration = getEntry();
-        when(lh.readEntries(anyLong(), 
anyLong())).thenReturn(entryEnumeration);
         when(lh.getId()).thenReturn(1L);
 
+        doAnswer(new Answer<Void>() {
+            public Void answer(InvocationOnMock invocation) {
+                ((ReadCallback) 
invocation.getArguments()[2]).readComplete(BKException.Code.OK, lh,
+                        entryEnumeration, null);
+                return null;
+            }
+        }).when(lh).asyncReadEntries(anyLong(), anyLong(), 
any(ReadCallback.class), any());
+        doAnswer(new Answer<Void>() {
+            public Void answer(InvocationOnMock invocation) {
+                ((AddCallback) 
invocation.getArguments()[1]).addComplete(BKException.Code.OK, lh,
+                        0, null);
+                return null;
+            }
+        }).when(lh).asyncAddEntry(any(byte[].class), any(AddCallback.class), 
any());
     }
 
     private Enumeration<LedgerEntry> getEntry() {
@@ -106,8 +142,8 @@ public class SanityTestCommandTest extends 
BookieCommandTestBase {
                     
getMockedConstruction(ClientConfiguration.class).constructed().get(0);
             verify(clientConf, times(1)).setAddEntryTimeout(1);
             verify(clientConf, times(1)).setReadEntryTimeout(1);
-            verify(lh, times(1)).addEntry(any());
-            verify(lh, times(1)).readEntries(0, 0);
+            verify(lh, times(1)).asyncAddEntry(any(byte[].class), 
any(AddCallback.class), any());
+            verify(lh, times(1)).asyncReadEntries(eq(0L), eq(0L), 
any(ReadCallback.class), any());
         } catch (Exception e) {
             throw new UncheckedExecutionException(e.getMessage(), e);
         }
@@ -134,12 +170,12 @@ public class SanityTestCommandTest extends 
BookieCommandTestBase {
             verify(clientConf, times(1))
                     
.setEnsemblePlacementPolicy(LocalBookieEnsemblePlacementPolicy.class);
             final BookKeeper bk = 
getMockedConstruction(BookKeeper.class).constructed().get(0);
-            verify(bk, times(1)).createLedger(1, 1, BookKeeper.DigestType.MAC, 
new byte[0]);
-            verify(lh, times(6)).getId();
-            verify(bk, times(1)).openLedger(anyLong(), 
eq(BookKeeper.DigestType.MAC), eq(new byte[0]));
+            verify(bk, times(1)).asyncCreateLedger(eq(1), eq(1), 
eq(BookKeeper.DigestType.MAC), eq(new byte[0]),
+                    any(CreateCallback.class), any());
+            verify(bk, times(1)).asyncOpenLedger(anyLong(), 
eq(BookKeeper.DigestType.MAC), eq(new byte[0]),
+                    any(OpenCallback.class), any());
             verify(lh, times(1)).getLastAddConfirmed();
-            verify(bk, times(1)).deleteLedger(anyLong());
-            verify(bk, times(1)).close();
+            verify(bk, times(1)).asyncDeleteLedger(anyLong(), 
any(DeleteCallback.class), any());
         } catch (Exception e) {
             throw new UncheckedExecutionException(e.getMessage(), e);
         }

Reply via email to