This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 97818f5  Add metrics and internal command for QueryAutoRecoveryStatus, 
including underReplicatedSize metrics,read/write latency, internal command for 
querying recovering ledgersInfo (#2768)
97818f5 is described below

commit 97818f5123999396e66f5246420d3c7e3d25f53d
Author: frankxieke <[email protected]>
AuthorDate: Mon Sep 6 09:07:47 2021 +0800

    Add metrics and internal command for QueryAutoRecoveryStatus, including 
underReplicatedSize metrics,read/write latency, internal command for querying 
recovering ledgersInfo (#2768)
    
    Motivation:
    
    Current AutoRecovery does not have enough metrics or stat command that 
would help to monitor and debug. So we need to add metrics and admin stat 
interface to monitor the process of AutoRecovery.  For example, current 
recovering ledgerInfo and under replicated size, read/write latency in 
recovering.
    
    Changes:
    
    And QueryAutoRecoveryStatus command and under replicated size metric , 
read/write latency metric in recovering
    
    Documentation:
    
    Need doc.
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  |  40 +++++
 .../client/LedgerFragmentReplicator.java           |  31 +++-
 .../org/apache/bookkeeper/replication/Auditor.java |  20 +++
 .../bookkeeper/replication/ReplicationStats.java   |   3 +
 .../QueryAutoRecoveryStatusCommand.java            | 151 +++++++++++++++++
 .../QueryAutoRecoveryStatusCommandTest.java        | 179 +++++++++++++++++++++
 6 files changed, 420 insertions(+), 4 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index c16efca..461623e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -47,6 +47,7 @@ import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.replication.ReplicationException;
 import 
org.apache.bookkeeper.tools.cli.commands.autorecovery.ListUnderReplicatedCommand;
 import 
org.apache.bookkeeper.tools.cli.commands.autorecovery.LostBookieRecoveryDelayCommand;
+import 
org.apache.bookkeeper.tools.cli.commands.autorecovery.QueryAutoRecoveryStatusCommand;
 import org.apache.bookkeeper.tools.cli.commands.autorecovery.ToggleCommand;
 import 
org.apache.bookkeeper.tools.cli.commands.autorecovery.TriggerAuditCommand;
 import 
org.apache.bookkeeper.tools.cli.commands.autorecovery.WhoIsAuditorCommand;
@@ -155,6 +156,7 @@ public class BookieShell implements Tool {
     static final String CMD_CONVERT_TO_INTERLEAVED_STORAGE = 
"convert-to-interleaved-storage";
     static final String CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX = 
"rebuild-db-ledger-locations-index";
     static final String CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE = 
"regenerate-interleaved-storage-index-file";
+    static final String CMD_QUERY_AUTORECOVERY_STATUS = "queryrecoverystatus";
 
     // cookie commands
     static final String CMD_CREATE_COOKIE = "cookie_create";
@@ -1344,6 +1346,43 @@ public class BookieShell implements Tool {
         }
     }
 
+
+    /**
+     * Command to query autorecovery status.
+     */
+    class QueryAutoRecoveryStatusCmd extends MyCommand {
+        Options opts = new Options();
+
+        public QueryAutoRecoveryStatusCmd() {
+            super(CMD_QUERY_AUTORECOVERY_STATUS);
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Query the autorecovery status";
+        }
+
+        @Override
+        String getUsage() {
+            return "queryautorecoverystatus";
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            final boolean verbose = cmdLine.hasOption("verbose");
+            QueryAutoRecoveryStatusCommand.QFlags flags = new 
QueryAutoRecoveryStatusCommand.QFlags()
+                                                            .verbose(verbose);
+            QueryAutoRecoveryStatusCommand cmd = new 
QueryAutoRecoveryStatusCommand();
+            cmd.apply(bkConf, flags);
+            return 0;
+        }
+    }
+
     /**
      * Setter and Getter for LostBookieRecoveryDelay value (in seconds) in 
metadata store.
      */
@@ -2153,6 +2192,7 @@ public class BookieShell implements Tool {
         commands.put(CMD_READJOURNAL, new ReadJournalCmd());
         commands.put(CMD_LASTMARK, new LastMarkCmd());
         commands.put(CMD_AUTORECOVERY, new AutoRecoveryCmd());
+        commands.put(CMD_QUERY_AUTORECOVERY_STATUS, new 
QueryAutoRecoveryStatusCmd());
         commands.put(CMD_LISTBOOKIES, new ListBookiesCmd());
         commands.put(CMD_LISTFILESONDISC, new ListDiskFilesCmd());
         commands.put(CMD_UPDATECOOKIE, new UpdateCookieCmd());
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index cd8b508..fb6259d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -24,10 +24,10 @@ import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_BYTES_READ;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_BYTES_WRITTEN;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_READ;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_WRITTEN;
-import static 
org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;
-
+import static 
org.apache.bookkeeper.replication.ReplicationStats.READ_DATA_LATENCY;;
+import static 
org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;;
+import static 
org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY;
 import io.netty.buffer.Unpooled;
-
 import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -35,11 +35,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
-
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -53,6 +53,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
 import org.apache.bookkeeper.util.ByteBufList;
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.zookeeper.AsyncCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,6 +91,17 @@ public class LedgerFragmentReplicator {
         help = "The distribution of size of entries written by the replicator"
     )
     private final OpStatsLogger numBytesWritten;
+    @StatsDoc(
+            name = READ_DATA_LATENCY,
+            help = "The distribution of latency of read entries by the 
replicator"
+    )
+    private final OpStatsLogger readDataLatency;
+    @StatsDoc(
+            name = WRITE_DATA_LATENCY,
+            help = "The distribution of latency of write entries by the 
replicator"
+    )
+    private final OpStatsLogger writeDataLatency;
+
 
     public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger) {
         this.bkc = bkc;
@@ -98,6 +110,8 @@ public class LedgerFragmentReplicator {
         numBytesRead = this.statsLogger.getOpStatsLogger(NUM_BYTES_READ);
         numEntriesWritten = this.statsLogger.getCounter(NUM_ENTRIES_WRITTEN);
         numBytesWritten = this.statsLogger.getOpStatsLogger(NUM_BYTES_WRITTEN);
+        readDataLatency = this.statsLogger.getOpStatsLogger(READ_DATA_LATENCY);
+        writeDataLatency = 
this.statsLogger.getOpStatsLogger(WRITE_DATA_LATENCY);
     }
 
     public LedgerFragmentReplicator(BookKeeper bkc) {
@@ -334,6 +348,8 @@ public class LedgerFragmentReplicator {
                 }
             }
         };
+
+        long startReadEntryTime = MathUtils.nowInNano();
         /*
          * Read the ledger entry using the LedgerHandle. This will allow us to
          * read the entry from one of the other replicated bookies other than
@@ -350,6 +366,10 @@ public class LedgerFragmentReplicator {
                     ledgerFragmentEntryMcb.processResult(rc, null, null);
                     return;
                 }
+
+                
readDataLatency.registerSuccessfulEvent(MathUtils.elapsedNanos(startReadEntryTime),
+                        TimeUnit.NANOSECONDS);
+
                 /*
                  * Now that we've read the ledger entry, write it to the new
                  * bookie we've selected.
@@ -364,10 +384,13 @@ public class LedgerFragmentReplicator {
                                 lh.getLastAddConfirmed(), entry.getLength(),
                                 Unpooled.wrappedBuffer(data, 0, data.length));
                 for (BookieId newBookie : newBookies) {
+                    long startWriteEntryTime = MathUtils.nowInNano();
                     bkc.getBookieClient().addEntry(newBookie, lh.getId(),
                             lh.getLedgerKey(), entryId, 
ByteBufList.clone(toSend),
                             multiWriteCallback, dataLength, 
BookieProtocol.FLAG_RECOVERY_ADD,
                             false, WriteFlag.NONE);
+                    writeDataLatency.registerSuccessfulEvent(
+                           MathUtils.elapsedNanos(startWriteEntryTime), 
TimeUnit.NANOSECONDS);
                 }
                 toSend.release();
             }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index f245614..064c7fc 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -38,6 +38,7 @@ import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDERREPLIC
 import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.REPLICAS_CHECK_TIME;
+import static 
org.apache.bookkeeper.replication.ReplicationStats.UNDER_REPLICATED_LEDGERS_TOTAL_SIZE;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE;
 import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
 
@@ -71,6 +72,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
@@ -170,6 +172,12 @@ public class Auditor implements AutoCloseable {
         help = "the distribution of num under_replicated ledgers on each 
auditor run"
     )
     private final OpStatsLogger numUnderReplicatedLedger;
+
+    @StatsDoc(
+            name = UNDER_REPLICATED_LEDGERS_TOTAL_SIZE,
+            help = "the distribution of under_replicated ledgers total size on 
each auditor run"
+    )
+    private final OpStatsLogger underReplicatedLedgerTotalSize;
     @StatsDoc(
         name = URL_PUBLISH_TIME_FOR_LOST_BOOKIE,
         help = "the latency distribution of publishing under replicated 
ledgers for lost bookies"
@@ -341,6 +349,7 @@ public class Auditor implements AutoCloseable {
         this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry = new 
AtomicInteger(0);
 
         numUnderReplicatedLedger = 
this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
+        underReplicatedLedgerTotalSize = 
this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE);
         uRLPublishTimeForLostBookies = this.statsLogger
                 
.getOpStatsLogger(ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE);
         bookieToLedgersMapCreationTime = this.statsLogger
@@ -1131,6 +1140,17 @@ public class Auditor implements AutoCloseable {
         }
         LOG.info("Following ledgers: {} of bookie: {} are identified as 
underreplicated", ledgers, missingBookies);
         numUnderReplicatedLedger.registerSuccessfulValue(ledgers.size());
+        LongAdder underReplicatedSize = new LongAdder();
+        FutureUtils.processList(
+                Lists.newArrayList(ledgers),
+                ledgerId ->
+                    
ledgerManager.readLedgerMetadata(ledgerId).whenComplete((metadata, exception) 
-> {
+                        if (exception == null) {
+                            
underReplicatedSize.add(metadata.getValue().getLength());
+                        }
+                    }), null);
+        
underReplicatedLedgerTotalSize.registerSuccessfulValue(underReplicatedSize.longValue());
+
         return FutureUtils.processList(
             Lists.newArrayList(ledgers),
             ledgerId -> 
ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, 
missingBookies),
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index 6ec9f49..b0bbe47 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -30,6 +30,7 @@ public interface ReplicationStats {
     String AUDITOR_SCOPE = "auditor";
     String ELECTION_ATTEMPTS = "election_attempts";
     String NUM_UNDER_REPLICATED_LEDGERS = "NUM_UNDER_REPLICATED_LEDGERS";
+    String UNDER_REPLICATED_LEDGERS_TOTAL_SIZE = 
"UNDER_REPLICATED_LEDGERS_TOTAL_SIZE";
     String URL_PUBLISH_TIME_FOR_LOST_BOOKIE = 
"URL_PUBLISH_TIME_FOR_LOST_BOOKIE";
     String BOOKIE_TO_LEDGERS_MAP_CREATION_TIME = 
"BOOKIE_TO_LEDGERS_MAP_CREATION_TIME";
     String CHECK_ALL_LEDGERS_TIME = "CHECK_ALL_LEDGERS_TIME";
@@ -58,6 +59,8 @@ public interface ReplicationStats {
     String NUM_BYTES_READ = "NUM_BYTES_READ";
     String NUM_ENTRIES_WRITTEN = "NUM_ENTRIES_WRITTEN";
     String NUM_BYTES_WRITTEN = "NUM_BYTES_WRITTEN";
+    String READ_DATA_LATENCY = "READ_DATA_LATENCY";
+    String WRITE_DATA_LATENCY = "WRITE_DATA_LATENCY";
     String REPLICATE_EXCEPTION = "exceptions";
     String NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER = 
"NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER";
     String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION = 
"NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION";
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java
new file mode 100644
index 0000000..0f86a2d
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.autorecovery;
+
+import static 
org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
+import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Command to Query current auto recovery status.
+ */
+public class QueryAutoRecoveryStatusCommand
+        extends BookieCommand<QueryAutoRecoveryStatusCommand.QFlags> {
+    static final Logger LOG = LoggerFactory.
+            getLogger(QueryAutoRecoveryStatusCommand.class);
+    private static final String NAME = "queryautorecoverystatus";
+    private static final String DESC = "Query autorecovery status.";
+
+    public QueryAutoRecoveryStatusCommand() {
+        super(CliSpec.<QFlags>newBuilder()
+                .withName(NAME)
+                .withDescription(DESC)
+                .withFlags(new QFlags())
+                .build());
+    }
+
+    @Override
+    public boolean apply(ServerConfiguration conf, QFlags cmdFlags) {
+        try {
+            return handler(conf, cmdFlags);
+        } catch (Exception e) {
+            throw new UncheckedExecutionException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Flags for list under replicated command.
+     */
+    @Accessors(fluent = true)
+    @Setter
+    public static class QFlags extends CliFlags{
+        @Parameter(names =  {"-v", "--verbose"}, description = "list 
recovering detailed ledger info")
+        private Boolean verbose = false;
+    }
+
+    private static class LedgerRecoverInfo {
+        Long ledgerId;
+        String bookieId;
+        LedgerRecoverInfo(Long ledgerId, String bookieId) {
+            this.ledgerId = ledgerId;
+            this.bookieId = bookieId;
+        }
+    }
+
+    /*
+    Print Message format is like this:
+
+     CurrentRecoverLedgerInfo:
+        LedgerId:   BookieId:     LedgerSize:(detail)
+        LedgerId:   BookieId:     LedgerSize:(detail)
+     */
+    public boolean handler(ServerConfiguration conf, QFlags flag) throws 
Exception {
+        runFunctionWithLedgerManagerFactory(conf, mFactory -> {
+            LedgerUnderreplicationManager underreplicationManager;
+            LedgerManager ledgerManager = mFactory.newLedgerManager();
+            List<LedgerRecoverInfo> ledgerList = new LinkedList<>();
+            try {
+                underreplicationManager = 
mFactory.newLedgerUnderreplicationManager();
+            } catch (KeeperException | 
ReplicationException.CompatibilityException e) {
+                throw new UncheckedExecutionException("Failed to new ledger 
underreplicated manager", e);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new UncheckedExecutionException("Interrupted on newing 
ledger underreplicated manager", e);
+            }
+            Iterator<UnderreplicatedLedger> iter = 
underreplicationManager.listLedgersToRereplicate(null);
+            while (iter.hasNext()) {
+                UnderreplicatedLedger underreplicatedLedger = iter.next();
+                long urLedgerId = underreplicatedLedger.getLedgerId();
+                try {
+                    String replicationWorkerId = underreplicationManager
+                            
.getReplicationWorkerIdRereplicatingLedger(urLedgerId);
+                    if (replicationWorkerId != null) {
+                        ledgerList.add(new LedgerRecoverInfo(urLedgerId, 
replicationWorkerId));
+                    }
+                } catch (ReplicationException.UnavailableException e) {
+                    LOG.error("Failed to get ReplicationWorkerId rereplicating 
ledger {} -- {}", urLedgerId,
+                            e.getMessage());
+                }
+            }
+
+            LOG.info("CurrentRecoverLedgerInfo:");
+            if (!flag.verbose) {
+                for (int i = 0; i < ledgerList.size(); i++) {
+                    LOG.info("\tLedgerId:{}\tBookieId:{}", 
ledgerList.get(i).ledgerId, ledgerList.get(i).bookieId);
+                }
+            } else {
+                for (int i = 0; i < ledgerList.size(); i++) {
+                    LedgerRecoverInfo info = ledgerList.get(i);
+                    
ledgerManager.readLedgerMetadata(info.ledgerId).whenComplete((metadata, 
exception) -> {
+                        if (exception == null) {
+                            
LOG.info("\tLedgerId:{}\tBookieId:{}\tLedgerSize:{}",
+                                    info.ledgerId, info.bookieId, 
metadata.getValue().getLength());
+                        } else {
+                            LOG.error("Unable to read the ledger: {} 
information", info.ledgerId);
+                            throw new UncheckedExecutionException(exception);
+                        }
+                    });
+                }
+            }
+            if (ledgerList.size() == 0) {
+                // NO ledger is being auto recovering
+                LOG.info("\t No Ledger is being recovered.");
+            }
+            return null;
+        });
+        return true;
+    }
+}
diff --git 
a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java
 
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java
new file mode 100644
index 0000000..ba92c8b
--- /dev/null
+++ 
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.autorecovery;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import com.google.common.collect.Lists;
+import java.lang.reflect.Constructor;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerMetadataBuilder;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieAddressResolver;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase;
+import org.apache.bookkeeper.tools.cli.helpers.CommandHelpers;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+
+
+/**
+ * Unit test for {@link QueryAutoRecoveryStatusCommand}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ QueryAutoRecoveryStatusCommand.class, 
ZKMetadataDriverBase.class, ZooKeeperClient.class,
+        CommandHelpers.class, MetadataDrivers.class
+})
+public class QueryAutoRecoveryStatusCommandTest extends BookieCommandTestBase {
+    public QueryAutoRecoveryStatusCommandTest() {
+        super(3, 0);
+    }
+    LedgerUnderreplicationManager underreplicationManager;
+
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        BookieId bookieId = BookieId.parse(UUID.randomUUID().toString());
+        LedgerManagerFactory ledgerManagerFactory = 
mock(LedgerManagerFactory.class);
+
+        PowerMockito.mockStatic(MetadataDrivers.class);
+        PowerMockito.doAnswer(invocationOnMock -> {
+            Function<LedgerManagerFactory, ?> function = 
invocationOnMock.getArgument(1);
+            function.apply(ledgerManagerFactory);
+            return true;
+        }).when(MetadataDrivers.class, "runFunctionWithLedgerManagerFactory", 
any(ServerConfiguration.class),
+                any(Function.class));
+
+        LedgerManager ledgerManager = mock(LedgerManager.class);
+        underreplicationManager = mock(LedgerUnderreplicationManager.class);
+
+        
when(ledgerManagerFactory.newLedgerManager()).thenReturn(ledgerManager);
+        
when(ledgerManagerFactory.newLedgerUnderreplicationManager()).thenReturn(underreplicationManager);
+
+        List<BookieId> ensemble = Lists.newArrayList(new 
BookieSocketAddress("192.0.2.1", 1234).toBookieId(),
+                new BookieSocketAddress("192.0.2.2", 1234).toBookieId(),
+                new BookieSocketAddress("192.0.2.3", 1234).toBookieId());
+        LedgerMetadata metadata = LedgerMetadataBuilder.create()
+                .withId(11112233)
+                .withClosedState()
+                .withLength(100000999)
+                .withLastEntryId(2000011)
+                
.withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(2)
+                .withPassword("passwd".getBytes())
+                .withDigestType(BookKeeper.DigestType.CRC32.toApiDigestType())
+                .newEnsembleEntry(0L, ensemble).build();
+        CompletableFuture<Versioned<LedgerMetadata>> promise = new 
CompletableFuture<>();
+        Versioned<LedgerMetadata> vmeta = new 
Versioned<LedgerMetadata>(metadata, new LongVersion(1000));
+        promise.complete(vmeta);
+
+        when(ledgerManager.readLedgerMetadata(1)).thenReturn(promise);
+        when(ledgerManager.readLedgerMetadata(33232)).thenReturn(promise);
+
+        Constructor<? extends UnderreplicatedLedger> constructor = 
UnderreplicatedLedger.class.
+                getDeclaredConstructor(long.class);
+        constructor.setAccessible(true);
+        final Queue<String> queue = new LinkedList<String>();
+        queue.add("1111");
+        Iterator<UnderreplicatedLedger> iter =  new 
Iterator<UnderreplicatedLedger>() {
+            @Override
+            public boolean hasNext() {
+                if (queue.size() > 0) {
+                    queue.remove();
+                    try {
+                        curBatch.add(constructor.newInstance(1));
+                        curBatch.add(constructor.newInstance(33232));
+                    } catch (Exception e) {
+                    }
+                }
+
+                if (curBatch.size() > 0) {
+                    return true;
+                }
+                return false;
+            }
+
+            @Override
+            public UnderreplicatedLedger next() {
+                return curBatch.remove();
+            }
+
+            final Queue<UnderreplicatedLedger> curBatch = new 
LinkedList<UnderreplicatedLedger>();
+        };
+
+        
when(underreplicationManager.listLedgersToRereplicate(any())).thenReturn(iter);
+
+        PowerMockito.mockStatic(CommandHelpers.class);
+        PowerMockito.when(CommandHelpers
+                .getBookieSocketAddrStringRepresentation(
+                        eq(bookieId), 
any(BookieAddressResolver.class))).thenReturn("");
+    }
+
+    @Test(timeout = 30000)
+    public void testQueryRecoverStatusCommand() {
+        try {
+            
when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(1)).thenReturn("192.168.0.103");
+            
when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(33232)).thenReturn("192.168.0.103");
+        } catch (Exception e) {
+        }
+        QueryAutoRecoveryStatusCommand cmd = new 
QueryAutoRecoveryStatusCommand();
+        Assert.assertTrue(cmd.apply(bkFlags, new String[] { "" }));
+    }
+
+    @Test(timeout = 30000)
+    public void testQueryRecoverStatusCommandWithDetail() {
+        try {
+            
when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(1)).thenReturn("192.168.0.103");
+            
when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(33232)).thenReturn("192.168.0.103");
+        } catch (Exception e) {
+        }
+        QueryAutoRecoveryStatusCommand cmd = new 
QueryAutoRecoveryStatusCommand();
+        Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-v" }));
+    }
+
+    @Test(timeout = 3000)
+    public void testNoLedgerIsBeingRecovered() {
+        QueryAutoRecoveryStatusCommand cmd = new 
QueryAutoRecoveryStatusCommand();
+        Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-v" }));
+    }
+}

Reply via email to