This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.14 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 1455250eeb0a74d18be68699ce1832e2345d900a Author: frankxieke <[email protected]> AuthorDate: Mon Sep 6 09:07:47 2021 +0800 Add metrics and internal command for QueryAutoRecoveryStatus, including underReplicatedSize metrics,read/write latency, internal command for querying recovering ledgersInfo (#2768) Motivation: Current AutoRecovery does not have enough metrics or stat command that would help to monitor and debug. So we need to add metrics and admin stat interface to monitor the process of AutoRecovery. For example, current recovering ledgerInfo and under replicated size, read/write latency in recovering. Changes: And QueryAutoRecoveryStatus command and under replicated size metric , read/write latency metric in recovering Documentation: Need doc. (cherry picked from commit 97818f5123999396e66f5246420d3c7e3d25f53d) --- .../org/apache/bookkeeper/bookie/BookieShell.java | 40 +++++ .../client/LedgerFragmentReplicator.java | 31 +++- .../org/apache/bookkeeper/replication/Auditor.java | 20 +++ .../bookkeeper/replication/ReplicationStats.java | 3 + .../QueryAutoRecoveryStatusCommand.java | 151 +++++++++++++++++ .../QueryAutoRecoveryStatusCommandTest.java | 179 +++++++++++++++++++++ 6 files changed, 420 insertions(+), 4 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index 0da44a7..6bfbbaa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -47,6 +47,7 @@ import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.replication.ReplicationException; import org.apache.bookkeeper.tools.cli.commands.autorecovery.ListUnderReplicatedCommand; import org.apache.bookkeeper.tools.cli.commands.autorecovery.LostBookieRecoveryDelayCommand; +import org.apache.bookkeeper.tools.cli.commands.autorecovery.QueryAutoRecoveryStatusCommand; import org.apache.bookkeeper.tools.cli.commands.autorecovery.ToggleCommand; import org.apache.bookkeeper.tools.cli.commands.autorecovery.TriggerAuditCommand; import org.apache.bookkeeper.tools.cli.commands.autorecovery.WhoIsAuditorCommand; @@ -155,6 +156,7 @@ public class BookieShell implements Tool { static final String CMD_CONVERT_TO_INTERLEAVED_STORAGE = "convert-to-interleaved-storage"; static final String CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX = "rebuild-db-ledger-locations-index"; static final String CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE = "regenerate-interleaved-storage-index-file"; + static final String CMD_QUERY_AUTORECOVERY_STATUS = "queryrecoverystatus"; // cookie commands static final String CMD_CREATE_COOKIE = "cookie_create"; @@ -1344,6 +1346,43 @@ public class BookieShell implements Tool { } } + + /** + * Command to query autorecovery status. + */ + class QueryAutoRecoveryStatusCmd extends MyCommand { + Options opts = new Options(); + + public QueryAutoRecoveryStatusCmd() { + super(CMD_QUERY_AUTORECOVERY_STATUS); + } + + @Override + Options getOptions() { + return opts; + } + + @Override + String getDescription() { + return "Query the autorecovery status"; + } + + @Override + String getUsage() { + return "queryautorecoverystatus"; + } + + @Override + int runCmd(CommandLine cmdLine) throws Exception { + final boolean verbose = cmdLine.hasOption("verbose"); + QueryAutoRecoveryStatusCommand.QFlags flags = new QueryAutoRecoveryStatusCommand.QFlags() + .verbose(verbose); + QueryAutoRecoveryStatusCommand cmd = new QueryAutoRecoveryStatusCommand(); + cmd.apply(bkConf, flags); + return 0; + } + } + /** * Setter and Getter for LostBookieRecoveryDelay value (in seconds) in metadata store. */ @@ -2153,6 +2192,7 @@ public class BookieShell implements Tool { commands.put(CMD_READJOURNAL, new ReadJournalCmd()); commands.put(CMD_LASTMARK, new LastMarkCmd()); commands.put(CMD_AUTORECOVERY, new AutoRecoveryCmd()); + commands.put(CMD_QUERY_AUTORECOVERY_STATUS, new QueryAutoRecoveryStatusCmd()); commands.put(CMD_LISTBOOKIES, new ListBookiesCmd()); commands.put(CMD_LISTFILESONDISC, new ListDiskFilesCmd()); commands.put(CMD_UPDATECOOKIE, new UpdateCookieCmd()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java index cd8b508..fb6259d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java @@ -24,10 +24,10 @@ import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BYTES_READ; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BYTES_WRITTEN; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_READ; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_WRITTEN; -import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE; - +import static org.apache.bookkeeper.replication.ReplicationStats.READ_DATA_LATENCY;; +import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;; +import static org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY; import io.netty.buffer.Unpooled; - import java.util.Enumeration; import java.util.HashSet; import java.util.Iterator; @@ -35,11 +35,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.stream.Collectors; - import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; import org.apache.bookkeeper.client.api.WriteFlag; import org.apache.bookkeeper.meta.LedgerManager; @@ -53,6 +53,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.annotations.StatsDoc; import org.apache.bookkeeper.util.ByteBufList; +import org.apache.bookkeeper.util.MathUtils; import org.apache.zookeeper.AsyncCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,6 +91,17 @@ public class LedgerFragmentReplicator { help = "The distribution of size of entries written by the replicator" ) private final OpStatsLogger numBytesWritten; + @StatsDoc( + name = READ_DATA_LATENCY, + help = "The distribution of latency of read entries by the replicator" + ) + private final OpStatsLogger readDataLatency; + @StatsDoc( + name = WRITE_DATA_LATENCY, + help = "The distribution of latency of write entries by the replicator" + ) + private final OpStatsLogger writeDataLatency; + public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger) { this.bkc = bkc; @@ -98,6 +110,8 @@ public class LedgerFragmentReplicator { numBytesRead = this.statsLogger.getOpStatsLogger(NUM_BYTES_READ); numEntriesWritten = this.statsLogger.getCounter(NUM_ENTRIES_WRITTEN); numBytesWritten = this.statsLogger.getOpStatsLogger(NUM_BYTES_WRITTEN); + readDataLatency = this.statsLogger.getOpStatsLogger(READ_DATA_LATENCY); + writeDataLatency = this.statsLogger.getOpStatsLogger(WRITE_DATA_LATENCY); } public LedgerFragmentReplicator(BookKeeper bkc) { @@ -334,6 +348,8 @@ public class LedgerFragmentReplicator { } } }; + + long startReadEntryTime = MathUtils.nowInNano(); /* * Read the ledger entry using the LedgerHandle. This will allow us to * read the entry from one of the other replicated bookies other than @@ -350,6 +366,10 @@ public class LedgerFragmentReplicator { ledgerFragmentEntryMcb.processResult(rc, null, null); return; } + + readDataLatency.registerSuccessfulEvent(MathUtils.elapsedNanos(startReadEntryTime), + TimeUnit.NANOSECONDS); + /* * Now that we've read the ledger entry, write it to the new * bookie we've selected. @@ -364,10 +384,13 @@ public class LedgerFragmentReplicator { lh.getLastAddConfirmed(), entry.getLength(), Unpooled.wrappedBuffer(data, 0, data.length)); for (BookieId newBookie : newBookies) { + long startWriteEntryTime = MathUtils.nowInNano(); bkc.getBookieClient().addEntry(newBookie, lh.getId(), lh.getLedgerKey(), entryId, ByteBufList.clone(toSend), multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD, false, WriteFlag.NONE); + writeDataLatency.registerSuccessfulEvent( + MathUtils.elapsedNanos(startWriteEntryTime), TimeUnit.NANOSECONDS); } toSend.release(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index 48f5101..e4cf394 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -38,6 +38,7 @@ import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDERREPLIC import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS; import static org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME; import static org.apache.bookkeeper.replication.ReplicationStats.REPLICAS_CHECK_TIME; +import static org.apache.bookkeeper.replication.ReplicationStats.UNDER_REPLICATED_LEDGERS_TOTAL_SIZE; import static org.apache.bookkeeper.replication.ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE; import static org.apache.bookkeeper.util.SafeRunnable.safeRun; @@ -71,6 +72,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -170,6 +172,12 @@ public class Auditor implements AutoCloseable { help = "the distribution of num under_replicated ledgers on each auditor run" ) private final OpStatsLogger numUnderReplicatedLedger; + + @StatsDoc( + name = UNDER_REPLICATED_LEDGERS_TOTAL_SIZE, + help = "the distribution of under_replicated ledgers total size on each auditor run" + ) + private final OpStatsLogger underReplicatedLedgerTotalSize; @StatsDoc( name = URL_PUBLISH_TIME_FOR_LOST_BOOKIE, help = "the latency distribution of publishing under replicated ledgers for lost bookies" @@ -341,6 +349,7 @@ public class Auditor implements AutoCloseable { this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry = new AtomicInteger(0); numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS); + underReplicatedLedgerTotalSize = this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE); uRLPublishTimeForLostBookies = this.statsLogger .getOpStatsLogger(ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE); bookieToLedgersMapCreationTime = this.statsLogger @@ -1131,6 +1140,17 @@ public class Auditor implements AutoCloseable { } LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, missingBookies); numUnderReplicatedLedger.registerSuccessfulValue(ledgers.size()); + LongAdder underReplicatedSize = new LongAdder(); + FutureUtils.processList( + Lists.newArrayList(ledgers), + ledgerId -> + ledgerManager.readLedgerMetadata(ledgerId).whenComplete((metadata, exception) -> { + if (exception == null) { + underReplicatedSize.add(metadata.getValue().getLength()); + } + }), null); + underReplicatedLedgerTotalSize.registerSuccessfulValue(underReplicatedSize.longValue()); + return FutureUtils.processList( Lists.newArrayList(ledgers), ledgerId -> ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, missingBookies), diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java index 6ec9f49..b0bbe47 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java @@ -30,6 +30,7 @@ public interface ReplicationStats { String AUDITOR_SCOPE = "auditor"; String ELECTION_ATTEMPTS = "election_attempts"; String NUM_UNDER_REPLICATED_LEDGERS = "NUM_UNDER_REPLICATED_LEDGERS"; + String UNDER_REPLICATED_LEDGERS_TOTAL_SIZE = "UNDER_REPLICATED_LEDGERS_TOTAL_SIZE"; String URL_PUBLISH_TIME_FOR_LOST_BOOKIE = "URL_PUBLISH_TIME_FOR_LOST_BOOKIE"; String BOOKIE_TO_LEDGERS_MAP_CREATION_TIME = "BOOKIE_TO_LEDGERS_MAP_CREATION_TIME"; String CHECK_ALL_LEDGERS_TIME = "CHECK_ALL_LEDGERS_TIME"; @@ -58,6 +59,8 @@ public interface ReplicationStats { String NUM_BYTES_READ = "NUM_BYTES_READ"; String NUM_ENTRIES_WRITTEN = "NUM_ENTRIES_WRITTEN"; String NUM_BYTES_WRITTEN = "NUM_BYTES_WRITTEN"; + String READ_DATA_LATENCY = "READ_DATA_LATENCY"; + String WRITE_DATA_LATENCY = "WRITE_DATA_LATENCY"; String REPLICATE_EXCEPTION = "exceptions"; String NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER = "NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER"; String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION = "NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION"; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java new file mode 100644 index 0000000..0f86a2d --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.tools.cli.commands.autorecovery; + +import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory; +import com.beust.jcommander.Parameter; +import com.google.common.util.concurrent.UncheckedExecutionException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import lombok.Setter; +import lombok.experimental.Accessors; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; +import org.apache.bookkeeper.meta.UnderreplicatedLedger; +import org.apache.bookkeeper.replication.ReplicationException; +import org.apache.bookkeeper.tools.cli.helpers.BookieCommand; +import org.apache.bookkeeper.tools.framework.CliFlags; +import org.apache.bookkeeper.tools.framework.CliSpec; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Command to Query current auto recovery status. + */ +public class QueryAutoRecoveryStatusCommand + extends BookieCommand<QueryAutoRecoveryStatusCommand.QFlags> { + static final Logger LOG = LoggerFactory. + getLogger(QueryAutoRecoveryStatusCommand.class); + private static final String NAME = "queryautorecoverystatus"; + private static final String DESC = "Query autorecovery status."; + + public QueryAutoRecoveryStatusCommand() { + super(CliSpec.<QFlags>newBuilder() + .withName(NAME) + .withDescription(DESC) + .withFlags(new QFlags()) + .build()); + } + + @Override + public boolean apply(ServerConfiguration conf, QFlags cmdFlags) { + try { + return handler(conf, cmdFlags); + } catch (Exception e) { + throw new UncheckedExecutionException(e.getMessage(), e); + } + } + + /** + * Flags for list under replicated command. + */ + @Accessors(fluent = true) + @Setter + public static class QFlags extends CliFlags{ + @Parameter(names = {"-v", "--verbose"}, description = "list recovering detailed ledger info") + private Boolean verbose = false; + } + + private static class LedgerRecoverInfo { + Long ledgerId; + String bookieId; + LedgerRecoverInfo(Long ledgerId, String bookieId) { + this.ledgerId = ledgerId; + this.bookieId = bookieId; + } + } + + /* + Print Message format is like this: + + CurrentRecoverLedgerInfo: + LedgerId: BookieId: LedgerSize:(detail) + LedgerId: BookieId: LedgerSize:(detail) + */ + public boolean handler(ServerConfiguration conf, QFlags flag) throws Exception { + runFunctionWithLedgerManagerFactory(conf, mFactory -> { + LedgerUnderreplicationManager underreplicationManager; + LedgerManager ledgerManager = mFactory.newLedgerManager(); + List<LedgerRecoverInfo> ledgerList = new LinkedList<>(); + try { + underreplicationManager = mFactory.newLedgerUnderreplicationManager(); + } catch (KeeperException | ReplicationException.CompatibilityException e) { + throw new UncheckedExecutionException("Failed to new ledger underreplicated manager", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedExecutionException("Interrupted on newing ledger underreplicated manager", e); + } + Iterator<UnderreplicatedLedger> iter = underreplicationManager.listLedgersToRereplicate(null); + while (iter.hasNext()) { + UnderreplicatedLedger underreplicatedLedger = iter.next(); + long urLedgerId = underreplicatedLedger.getLedgerId(); + try { + String replicationWorkerId = underreplicationManager + .getReplicationWorkerIdRereplicatingLedger(urLedgerId); + if (replicationWorkerId != null) { + ledgerList.add(new LedgerRecoverInfo(urLedgerId, replicationWorkerId)); + } + } catch (ReplicationException.UnavailableException e) { + LOG.error("Failed to get ReplicationWorkerId rereplicating ledger {} -- {}", urLedgerId, + e.getMessage()); + } + } + + LOG.info("CurrentRecoverLedgerInfo:"); + if (!flag.verbose) { + for (int i = 0; i < ledgerList.size(); i++) { + LOG.info("\tLedgerId:{}\tBookieId:{}", ledgerList.get(i).ledgerId, ledgerList.get(i).bookieId); + } + } else { + for (int i = 0; i < ledgerList.size(); i++) { + LedgerRecoverInfo info = ledgerList.get(i); + ledgerManager.readLedgerMetadata(info.ledgerId).whenComplete((metadata, exception) -> { + if (exception == null) { + LOG.info("\tLedgerId:{}\tBookieId:{}\tLedgerSize:{}", + info.ledgerId, info.bookieId, metadata.getValue().getLength()); + } else { + LOG.error("Unable to read the ledger: {} information", info.ledgerId); + throw new UncheckedExecutionException(exception); + } + }); + } + } + if (ledgerList.size() == 0) { + // NO ledger is being auto recovering + LOG.info("\t No Ledger is being recovered."); + } + return null; + }); + return true; + } +} diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java new file mode 100644 index 0000000..ba92c8b --- /dev/null +++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.tools.cli.commands.autorecovery; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import com.google.common.collect.Lists; +import java.lang.reflect.Constructor; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerMetadataBuilder; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.meta.LedgerManagerFactory; +import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; +import org.apache.bookkeeper.meta.MetadataDrivers; +import org.apache.bookkeeper.meta.UnderreplicatedLedger; +import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.BookieAddressResolver; +import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase; +import org.apache.bookkeeper.tools.cli.helpers.CommandHelpers; +import org.apache.bookkeeper.versioning.LongVersion; +import org.apache.bookkeeper.versioning.Versioned; +import org.apache.bookkeeper.zookeeper.ZooKeeperClient; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + + + +/** + * Unit test for {@link QueryAutoRecoveryStatusCommand}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({ QueryAutoRecoveryStatusCommand.class, ZKMetadataDriverBase.class, ZooKeeperClient.class, + CommandHelpers.class, MetadataDrivers.class +}) +public class QueryAutoRecoveryStatusCommandTest extends BookieCommandTestBase { + public QueryAutoRecoveryStatusCommandTest() { + super(3, 0); + } + LedgerUnderreplicationManager underreplicationManager; + + @Override + public void setup() throws Exception { + super.setup(); + BookieId bookieId = BookieId.parse(UUID.randomUUID().toString()); + LedgerManagerFactory ledgerManagerFactory = mock(LedgerManagerFactory.class); + + PowerMockito.mockStatic(MetadataDrivers.class); + PowerMockito.doAnswer(invocationOnMock -> { + Function<LedgerManagerFactory, ?> function = invocationOnMock.getArgument(1); + function.apply(ledgerManagerFactory); + return true; + }).when(MetadataDrivers.class, "runFunctionWithLedgerManagerFactory", any(ServerConfiguration.class), + any(Function.class)); + + LedgerManager ledgerManager = mock(LedgerManager.class); + underreplicationManager = mock(LedgerUnderreplicationManager.class); + + when(ledgerManagerFactory.newLedgerManager()).thenReturn(ledgerManager); + when(ledgerManagerFactory.newLedgerUnderreplicationManager()).thenReturn(underreplicationManager); + + List<BookieId> ensemble = Lists.newArrayList(new BookieSocketAddress("192.0.2.1", 1234).toBookieId(), + new BookieSocketAddress("192.0.2.2", 1234).toBookieId(), + new BookieSocketAddress("192.0.2.3", 1234).toBookieId()); + LedgerMetadata metadata = LedgerMetadataBuilder.create() + .withId(11112233) + .withClosedState() + .withLength(100000999) + .withLastEntryId(2000011) + .withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(2) + .withPassword("passwd".getBytes()) + .withDigestType(BookKeeper.DigestType.CRC32.toApiDigestType()) + .newEnsembleEntry(0L, ensemble).build(); + CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>(); + Versioned<LedgerMetadata> vmeta = new Versioned<LedgerMetadata>(metadata, new LongVersion(1000)); + promise.complete(vmeta); + + when(ledgerManager.readLedgerMetadata(1)).thenReturn(promise); + when(ledgerManager.readLedgerMetadata(33232)).thenReturn(promise); + + Constructor<? extends UnderreplicatedLedger> constructor = UnderreplicatedLedger.class. + getDeclaredConstructor(long.class); + constructor.setAccessible(true); + final Queue<String> queue = new LinkedList<String>(); + queue.add("1111"); + Iterator<UnderreplicatedLedger> iter = new Iterator<UnderreplicatedLedger>() { + @Override + public boolean hasNext() { + if (queue.size() > 0) { + queue.remove(); + try { + curBatch.add(constructor.newInstance(1)); + curBatch.add(constructor.newInstance(33232)); + } catch (Exception e) { + } + } + + if (curBatch.size() > 0) { + return true; + } + return false; + } + + @Override + public UnderreplicatedLedger next() { + return curBatch.remove(); + } + + final Queue<UnderreplicatedLedger> curBatch = new LinkedList<UnderreplicatedLedger>(); + }; + + when(underreplicationManager.listLedgersToRereplicate(any())).thenReturn(iter); + + PowerMockito.mockStatic(CommandHelpers.class); + PowerMockito.when(CommandHelpers + .getBookieSocketAddrStringRepresentation( + eq(bookieId), any(BookieAddressResolver.class))).thenReturn(""); + } + + @Test(timeout = 30000) + public void testQueryRecoverStatusCommand() { + try { + when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(1)).thenReturn("192.168.0.103"); + when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(33232)).thenReturn("192.168.0.103"); + } catch (Exception e) { + } + QueryAutoRecoveryStatusCommand cmd = new QueryAutoRecoveryStatusCommand(); + Assert.assertTrue(cmd.apply(bkFlags, new String[] { "" })); + } + + @Test(timeout = 30000) + public void testQueryRecoverStatusCommandWithDetail() { + try { + when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(1)).thenReturn("192.168.0.103"); + when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(33232)).thenReturn("192.168.0.103"); + } catch (Exception e) { + } + QueryAutoRecoveryStatusCommand cmd = new QueryAutoRecoveryStatusCommand(); + Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-v" })); + } + + @Test(timeout = 3000) + public void testNoLedgerIsBeingRecovered() { + QueryAutoRecoveryStatusCommand cmd = new QueryAutoRecoveryStatusCommand(); + Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-v" })); + } +}
