This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 97818f5 Add metrics and internal command for QueryAutoRecoveryStatus,
including underReplicatedSize metrics,read/write latency, internal command for
querying recovering ledgersInfo (#2768)
97818f5 is described below
commit 97818f5123999396e66f5246420d3c7e3d25f53d
Author: frankxieke <[email protected]>
AuthorDate: Mon Sep 6 09:07:47 2021 +0800
Add metrics and internal command for QueryAutoRecoveryStatus, including
underReplicatedSize metrics,read/write latency, internal command for querying
recovering ledgersInfo (#2768)
Motivation:
Current AutoRecovery does not have enough metrics or stat command that
would help to monitor and debug. So we need to add metrics and admin stat
interface to monitor the process of AutoRecovery. For example, current
recovering ledgerInfo and under replicated size, read/write latency in
recovering.
Changes:
And QueryAutoRecoveryStatus command and under replicated size metric ,
read/write latency metric in recovering
Documentation:
Need doc.
---
.../org/apache/bookkeeper/bookie/BookieShell.java | 40 +++++
.../client/LedgerFragmentReplicator.java | 31 +++-
.../org/apache/bookkeeper/replication/Auditor.java | 20 +++
.../bookkeeper/replication/ReplicationStats.java | 3 +
.../QueryAutoRecoveryStatusCommand.java | 151 +++++++++++++++++
.../QueryAutoRecoveryStatusCommandTest.java | 179 +++++++++++++++++++++
6 files changed, 420 insertions(+), 4 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index c16efca..461623e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -47,6 +47,7 @@ import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.replication.ReplicationException;
import
org.apache.bookkeeper.tools.cli.commands.autorecovery.ListUnderReplicatedCommand;
import
org.apache.bookkeeper.tools.cli.commands.autorecovery.LostBookieRecoveryDelayCommand;
+import
org.apache.bookkeeper.tools.cli.commands.autorecovery.QueryAutoRecoveryStatusCommand;
import org.apache.bookkeeper.tools.cli.commands.autorecovery.ToggleCommand;
import
org.apache.bookkeeper.tools.cli.commands.autorecovery.TriggerAuditCommand;
import
org.apache.bookkeeper.tools.cli.commands.autorecovery.WhoIsAuditorCommand;
@@ -155,6 +156,7 @@ public class BookieShell implements Tool {
static final String CMD_CONVERT_TO_INTERLEAVED_STORAGE =
"convert-to-interleaved-storage";
static final String CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX =
"rebuild-db-ledger-locations-index";
static final String CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE =
"regenerate-interleaved-storage-index-file";
+ static final String CMD_QUERY_AUTORECOVERY_STATUS = "queryrecoverystatus";
// cookie commands
static final String CMD_CREATE_COOKIE = "cookie_create";
@@ -1344,6 +1346,43 @@ public class BookieShell implements Tool {
}
}
+
+ /**
+ * Command to query autorecovery status.
+ */
+ class QueryAutoRecoveryStatusCmd extends MyCommand {
+ Options opts = new Options();
+
+ public QueryAutoRecoveryStatusCmd() {
+ super(CMD_QUERY_AUTORECOVERY_STATUS);
+ }
+
+ @Override
+ Options getOptions() {
+ return opts;
+ }
+
+ @Override
+ String getDescription() {
+ return "Query the autorecovery status";
+ }
+
+ @Override
+ String getUsage() {
+ return "queryautorecoverystatus";
+ }
+
+ @Override
+ int runCmd(CommandLine cmdLine) throws Exception {
+ final boolean verbose = cmdLine.hasOption("verbose");
+ QueryAutoRecoveryStatusCommand.QFlags flags = new
QueryAutoRecoveryStatusCommand.QFlags()
+ .verbose(verbose);
+ QueryAutoRecoveryStatusCommand cmd = new
QueryAutoRecoveryStatusCommand();
+ cmd.apply(bkConf, flags);
+ return 0;
+ }
+ }
+
/**
* Setter and Getter for LostBookieRecoveryDelay value (in seconds) in
metadata store.
*/
@@ -2153,6 +2192,7 @@ public class BookieShell implements Tool {
commands.put(CMD_READJOURNAL, new ReadJournalCmd());
commands.put(CMD_LASTMARK, new LastMarkCmd());
commands.put(CMD_AUTORECOVERY, new AutoRecoveryCmd());
+ commands.put(CMD_QUERY_AUTORECOVERY_STATUS, new
QueryAutoRecoveryStatusCmd());
commands.put(CMD_LISTBOOKIES, new ListBookiesCmd());
commands.put(CMD_LISTFILESONDISC, new ListDiskFilesCmd());
commands.put(CMD_UPDATECOOKIE, new UpdateCookieCmd());
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index cd8b508..fb6259d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -24,10 +24,10 @@ import static
org.apache.bookkeeper.replication.ReplicationStats.NUM_BYTES_READ;
import static
org.apache.bookkeeper.replication.ReplicationStats.NUM_BYTES_WRITTEN;
import static
org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_READ;
import static
org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_WRITTEN;
-import static
org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;
-
+import static
org.apache.bookkeeper.replication.ReplicationStats.READ_DATA_LATENCY;;
+import static
org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;;
+import static
org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY;
import io.netty.buffer.Unpooled;
-
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
@@ -35,11 +35,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
-
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.meta.LedgerManager;
@@ -53,6 +53,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.bookkeeper.util.ByteBufList;
+import org.apache.bookkeeper.util.MathUtils;
import org.apache.zookeeper.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,6 +91,17 @@ public class LedgerFragmentReplicator {
help = "The distribution of size of entries written by the replicator"
)
private final OpStatsLogger numBytesWritten;
+ @StatsDoc(
+ name = READ_DATA_LATENCY,
+ help = "The distribution of latency of read entries by the
replicator"
+ )
+ private final OpStatsLogger readDataLatency;
+ @StatsDoc(
+ name = WRITE_DATA_LATENCY,
+ help = "The distribution of latency of write entries by the
replicator"
+ )
+ private final OpStatsLogger writeDataLatency;
+
public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger) {
this.bkc = bkc;
@@ -98,6 +110,8 @@ public class LedgerFragmentReplicator {
numBytesRead = this.statsLogger.getOpStatsLogger(NUM_BYTES_READ);
numEntriesWritten = this.statsLogger.getCounter(NUM_ENTRIES_WRITTEN);
numBytesWritten = this.statsLogger.getOpStatsLogger(NUM_BYTES_WRITTEN);
+ readDataLatency = this.statsLogger.getOpStatsLogger(READ_DATA_LATENCY);
+ writeDataLatency =
this.statsLogger.getOpStatsLogger(WRITE_DATA_LATENCY);
}
public LedgerFragmentReplicator(BookKeeper bkc) {
@@ -334,6 +348,8 @@ public class LedgerFragmentReplicator {
}
}
};
+
+ long startReadEntryTime = MathUtils.nowInNano();
/*
* Read the ledger entry using the LedgerHandle. This will allow us to
* read the entry from one of the other replicated bookies other than
@@ -350,6 +366,10 @@ public class LedgerFragmentReplicator {
ledgerFragmentEntryMcb.processResult(rc, null, null);
return;
}
+
+
readDataLatency.registerSuccessfulEvent(MathUtils.elapsedNanos(startReadEntryTime),
+ TimeUnit.NANOSECONDS);
+
/*
* Now that we've read the ledger entry, write it to the new
* bookie we've selected.
@@ -364,10 +384,13 @@ public class LedgerFragmentReplicator {
lh.getLastAddConfirmed(), entry.getLength(),
Unpooled.wrappedBuffer(data, 0, data.length));
for (BookieId newBookie : newBookies) {
+ long startWriteEntryTime = MathUtils.nowInNano();
bkc.getBookieClient().addEntry(newBookie, lh.getId(),
lh.getLedgerKey(), entryId,
ByteBufList.clone(toSend),
multiWriteCallback, dataLength,
BookieProtocol.FLAG_RECOVERY_ADD,
false, WriteFlag.NONE);
+ writeDataLatency.registerSuccessfulEvent(
+ MathUtils.elapsedNanos(startWriteEntryTime),
TimeUnit.NANOSECONDS);
}
toSend.release();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index f245614..064c7fc 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -38,6 +38,7 @@ import static
org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDERREPLIC
import static
org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS;
import static
org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME;
import static
org.apache.bookkeeper.replication.ReplicationStats.REPLICAS_CHECK_TIME;
+import static
org.apache.bookkeeper.replication.ReplicationStats.UNDER_REPLICATED_LEDGERS_TOTAL_SIZE;
import static
org.apache.bookkeeper.replication.ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE;
import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
@@ -71,6 +72,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -170,6 +172,12 @@ public class Auditor implements AutoCloseable {
help = "the distribution of num under_replicated ledgers on each
auditor run"
)
private final OpStatsLogger numUnderReplicatedLedger;
+
+ @StatsDoc(
+ name = UNDER_REPLICATED_LEDGERS_TOTAL_SIZE,
+ help = "the distribution of under_replicated ledgers total size on
each auditor run"
+ )
+ private final OpStatsLogger underReplicatedLedgerTotalSize;
@StatsDoc(
name = URL_PUBLISH_TIME_FOR_LOST_BOOKIE,
help = "the latency distribution of publishing under replicated
ledgers for lost bookies"
@@ -341,6 +349,7 @@ public class Auditor implements AutoCloseable {
this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry = new
AtomicInteger(0);
numUnderReplicatedLedger =
this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
+ underReplicatedLedgerTotalSize =
this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE);
uRLPublishTimeForLostBookies = this.statsLogger
.getOpStatsLogger(ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE);
bookieToLedgersMapCreationTime = this.statsLogger
@@ -1131,6 +1140,17 @@ public class Auditor implements AutoCloseable {
}
LOG.info("Following ledgers: {} of bookie: {} are identified as
underreplicated", ledgers, missingBookies);
numUnderReplicatedLedger.registerSuccessfulValue(ledgers.size());
+ LongAdder underReplicatedSize = new LongAdder();
+ FutureUtils.processList(
+ Lists.newArrayList(ledgers),
+ ledgerId ->
+
ledgerManager.readLedgerMetadata(ledgerId).whenComplete((metadata, exception)
-> {
+ if (exception == null) {
+
underReplicatedSize.add(metadata.getValue().getLength());
+ }
+ }), null);
+
underReplicatedLedgerTotalSize.registerSuccessfulValue(underReplicatedSize.longValue());
+
return FutureUtils.processList(
Lists.newArrayList(ledgers),
ledgerId ->
ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId,
missingBookies),
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index 6ec9f49..b0bbe47 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -30,6 +30,7 @@ public interface ReplicationStats {
String AUDITOR_SCOPE = "auditor";
String ELECTION_ATTEMPTS = "election_attempts";
String NUM_UNDER_REPLICATED_LEDGERS = "NUM_UNDER_REPLICATED_LEDGERS";
+ String UNDER_REPLICATED_LEDGERS_TOTAL_SIZE =
"UNDER_REPLICATED_LEDGERS_TOTAL_SIZE";
String URL_PUBLISH_TIME_FOR_LOST_BOOKIE =
"URL_PUBLISH_TIME_FOR_LOST_BOOKIE";
String BOOKIE_TO_LEDGERS_MAP_CREATION_TIME =
"BOOKIE_TO_LEDGERS_MAP_CREATION_TIME";
String CHECK_ALL_LEDGERS_TIME = "CHECK_ALL_LEDGERS_TIME";
@@ -58,6 +59,8 @@ public interface ReplicationStats {
String NUM_BYTES_READ = "NUM_BYTES_READ";
String NUM_ENTRIES_WRITTEN = "NUM_ENTRIES_WRITTEN";
String NUM_BYTES_WRITTEN = "NUM_BYTES_WRITTEN";
+ String READ_DATA_LATENCY = "READ_DATA_LATENCY";
+ String WRITE_DATA_LATENCY = "WRITE_DATA_LATENCY";
String REPLICATE_EXCEPTION = "exceptions";
String NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER =
"NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER";
String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION =
"NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION";
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java
new file mode 100644
index 0000000..0f86a2d
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.autorecovery;
+
+import static
org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
+import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Command to Query current auto recovery status.
+ */
+public class QueryAutoRecoveryStatusCommand
+ extends BookieCommand<QueryAutoRecoveryStatusCommand.QFlags> {
+ static final Logger LOG = LoggerFactory.
+ getLogger(QueryAutoRecoveryStatusCommand.class);
+ private static final String NAME = "queryautorecoverystatus";
+ private static final String DESC = "Query autorecovery status.";
+
+ public QueryAutoRecoveryStatusCommand() {
+ super(CliSpec.<QFlags>newBuilder()
+ .withName(NAME)
+ .withDescription(DESC)
+ .withFlags(new QFlags())
+ .build());
+ }
+
+ @Override
+ public boolean apply(ServerConfiguration conf, QFlags cmdFlags) {
+ try {
+ return handler(conf, cmdFlags);
+ } catch (Exception e) {
+ throw new UncheckedExecutionException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Flags for list under replicated command.
+ */
+ @Accessors(fluent = true)
+ @Setter
+ public static class QFlags extends CliFlags{
+ @Parameter(names = {"-v", "--verbose"}, description = "list
recovering detailed ledger info")
+ private Boolean verbose = false;
+ }
+
+ private static class LedgerRecoverInfo {
+ Long ledgerId;
+ String bookieId;
+ LedgerRecoverInfo(Long ledgerId, String bookieId) {
+ this.ledgerId = ledgerId;
+ this.bookieId = bookieId;
+ }
+ }
+
+ /*
+ Print Message format is like this:
+
+ CurrentRecoverLedgerInfo:
+ LedgerId: BookieId: LedgerSize:(detail)
+ LedgerId: BookieId: LedgerSize:(detail)
+ */
+ public boolean handler(ServerConfiguration conf, QFlags flag) throws
Exception {
+ runFunctionWithLedgerManagerFactory(conf, mFactory -> {
+ LedgerUnderreplicationManager underreplicationManager;
+ LedgerManager ledgerManager = mFactory.newLedgerManager();
+ List<LedgerRecoverInfo> ledgerList = new LinkedList<>();
+ try {
+ underreplicationManager =
mFactory.newLedgerUnderreplicationManager();
+ } catch (KeeperException |
ReplicationException.CompatibilityException e) {
+ throw new UncheckedExecutionException("Failed to new ledger
underreplicated manager", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedExecutionException("Interrupted on newing
ledger underreplicated manager", e);
+ }
+ Iterator<UnderreplicatedLedger> iter =
underreplicationManager.listLedgersToRereplicate(null);
+ while (iter.hasNext()) {
+ UnderreplicatedLedger underreplicatedLedger = iter.next();
+ long urLedgerId = underreplicatedLedger.getLedgerId();
+ try {
+ String replicationWorkerId = underreplicationManager
+
.getReplicationWorkerIdRereplicatingLedger(urLedgerId);
+ if (replicationWorkerId != null) {
+ ledgerList.add(new LedgerRecoverInfo(urLedgerId,
replicationWorkerId));
+ }
+ } catch (ReplicationException.UnavailableException e) {
+ LOG.error("Failed to get ReplicationWorkerId rereplicating
ledger {} -- {}", urLedgerId,
+ e.getMessage());
+ }
+ }
+
+ LOG.info("CurrentRecoverLedgerInfo:");
+ if (!flag.verbose) {
+ for (int i = 0; i < ledgerList.size(); i++) {
+ LOG.info("\tLedgerId:{}\tBookieId:{}",
ledgerList.get(i).ledgerId, ledgerList.get(i).bookieId);
+ }
+ } else {
+ for (int i = 0; i < ledgerList.size(); i++) {
+ LedgerRecoverInfo info = ledgerList.get(i);
+
ledgerManager.readLedgerMetadata(info.ledgerId).whenComplete((metadata,
exception) -> {
+ if (exception == null) {
+
LOG.info("\tLedgerId:{}\tBookieId:{}\tLedgerSize:{}",
+ info.ledgerId, info.bookieId,
metadata.getValue().getLength());
+ } else {
+ LOG.error("Unable to read the ledger: {}
information", info.ledgerId);
+ throw new UncheckedExecutionException(exception);
+ }
+ });
+ }
+ }
+ if (ledgerList.size() == 0) {
+ // NO ledger is being auto recovering
+ LOG.info("\t No Ledger is being recovered.");
+ }
+ return null;
+ });
+ return true;
+ }
+}
diff --git
a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java
new file mode 100644
index 0000000..ba92c8b
--- /dev/null
+++
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.autorecovery;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import com.google.common.collect.Lists;
+import java.lang.reflect.Constructor;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerMetadataBuilder;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieAddressResolver;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase;
+import org.apache.bookkeeper.tools.cli.helpers.CommandHelpers;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+
+
+/**
+ * Unit test for {@link QueryAutoRecoveryStatusCommand}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ QueryAutoRecoveryStatusCommand.class,
ZKMetadataDriverBase.class, ZooKeeperClient.class,
+ CommandHelpers.class, MetadataDrivers.class
+})
+public class QueryAutoRecoveryStatusCommandTest extends BookieCommandTestBase {
+ public QueryAutoRecoveryStatusCommandTest() {
+ super(3, 0);
+ }
+ LedgerUnderreplicationManager underreplicationManager;
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ BookieId bookieId = BookieId.parse(UUID.randomUUID().toString());
+ LedgerManagerFactory ledgerManagerFactory =
mock(LedgerManagerFactory.class);
+
+ PowerMockito.mockStatic(MetadataDrivers.class);
+ PowerMockito.doAnswer(invocationOnMock -> {
+ Function<LedgerManagerFactory, ?> function =
invocationOnMock.getArgument(1);
+ function.apply(ledgerManagerFactory);
+ return true;
+ }).when(MetadataDrivers.class, "runFunctionWithLedgerManagerFactory",
any(ServerConfiguration.class),
+ any(Function.class));
+
+ LedgerManager ledgerManager = mock(LedgerManager.class);
+ underreplicationManager = mock(LedgerUnderreplicationManager.class);
+
+
when(ledgerManagerFactory.newLedgerManager()).thenReturn(ledgerManager);
+
when(ledgerManagerFactory.newLedgerUnderreplicationManager()).thenReturn(underreplicationManager);
+
+ List<BookieId> ensemble = Lists.newArrayList(new
BookieSocketAddress("192.0.2.1", 1234).toBookieId(),
+ new BookieSocketAddress("192.0.2.2", 1234).toBookieId(),
+ new BookieSocketAddress("192.0.2.3", 1234).toBookieId());
+ LedgerMetadata metadata = LedgerMetadataBuilder.create()
+ .withId(11112233)
+ .withClosedState()
+ .withLength(100000999)
+ .withLastEntryId(2000011)
+
.withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(2)
+ .withPassword("passwd".getBytes())
+ .withDigestType(BookKeeper.DigestType.CRC32.toApiDigestType())
+ .newEnsembleEntry(0L, ensemble).build();
+ CompletableFuture<Versioned<LedgerMetadata>> promise = new
CompletableFuture<>();
+ Versioned<LedgerMetadata> vmeta = new
Versioned<LedgerMetadata>(metadata, new LongVersion(1000));
+ promise.complete(vmeta);
+
+ when(ledgerManager.readLedgerMetadata(1)).thenReturn(promise);
+ when(ledgerManager.readLedgerMetadata(33232)).thenReturn(promise);
+
+ Constructor<? extends UnderreplicatedLedger> constructor =
UnderreplicatedLedger.class.
+ getDeclaredConstructor(long.class);
+ constructor.setAccessible(true);
+ final Queue<String> queue = new LinkedList<String>();
+ queue.add("1111");
+ Iterator<UnderreplicatedLedger> iter = new
Iterator<UnderreplicatedLedger>() {
+ @Override
+ public boolean hasNext() {
+ if (queue.size() > 0) {
+ queue.remove();
+ try {
+ curBatch.add(constructor.newInstance(1));
+ curBatch.add(constructor.newInstance(33232));
+ } catch (Exception e) {
+ }
+ }
+
+ if (curBatch.size() > 0) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public UnderreplicatedLedger next() {
+ return curBatch.remove();
+ }
+
+ final Queue<UnderreplicatedLedger> curBatch = new
LinkedList<UnderreplicatedLedger>();
+ };
+
+
when(underreplicationManager.listLedgersToRereplicate(any())).thenReturn(iter);
+
+ PowerMockito.mockStatic(CommandHelpers.class);
+ PowerMockito.when(CommandHelpers
+ .getBookieSocketAddrStringRepresentation(
+ eq(bookieId),
any(BookieAddressResolver.class))).thenReturn("");
+ }
+
+ @Test(timeout = 30000)
+ public void testQueryRecoverStatusCommand() {
+ try {
+
when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(1)).thenReturn("192.168.0.103");
+
when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(33232)).thenReturn("192.168.0.103");
+ } catch (Exception e) {
+ }
+ QueryAutoRecoveryStatusCommand cmd = new
QueryAutoRecoveryStatusCommand();
+ Assert.assertTrue(cmd.apply(bkFlags, new String[] { "" }));
+ }
+
+ @Test(timeout = 30000)
+ public void testQueryRecoverStatusCommandWithDetail() {
+ try {
+
when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(1)).thenReturn("192.168.0.103");
+
when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(33232)).thenReturn("192.168.0.103");
+ } catch (Exception e) {
+ }
+ QueryAutoRecoveryStatusCommand cmd = new
QueryAutoRecoveryStatusCommand();
+ Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-v" }));
+ }
+
+ @Test(timeout = 3000)
+ public void testNoLedgerIsBeingRecovered() {
+ QueryAutoRecoveryStatusCommand cmd = new
QueryAutoRecoveryStatusCommand();
+ Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-v" }));
+ }
+}