This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 6489a7d419 Fix numLedgersReplicated metric (#3654)
6489a7d419 is described below
commit 6489a7d4193ca96379578a8c82b14e780154c6ef
Author: wenbingshen <[email protected]>
AuthorDate: Wed Feb 15 15:13:39 2023 +0800
Fix numLedgersReplicated metric (#3654)
### Motivation
When I want to add num not adhering placement ledgers replicated metric for
RW(#3652), I found the numLedgersReplicated statistic to be inaccurate.
When we could not find a target bookie to replicate the ledger, the
numLedgersReplicated metric will still increase.

---
.../apache/bookkeeper/client/BookKeeperAdmin.java | 2 +-
.../replication/TestReplicationWorker.java | 64 ++++++++++++++++++++--
2 files changed, 61 insertions(+), 5 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 42178156fa..f9ac460bde 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -1145,7 +1145,7 @@ public class BookKeeperAdmin implements AutoCloseable {
if (MapUtils.isEmpty(targetBookieAddresses)) {
LOG.warn("Could not replicate for {} ledger: {}, not find target
bookie.",
ledgerFragment.getReplicateType(),
ledgerFragment.getLedgerId());
- return;
+ throw new BKException.BKLedgerRecoveryException();
}
replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses,
onReadEntryFailureCallback);
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index 9982dd506a..88e05b995b 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.replication;
import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
import static
org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION;
+import static
org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_SCOPE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -31,6 +32,8 @@ import static org.junit.Assert.fail;
import io.netty.util.HashedWheelTimer;
import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Enumeration;
@@ -44,6 +47,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.bookkeeper.bookie.BookieImpl;
@@ -1156,11 +1160,48 @@ public class TestReplicationWorker extends
BookKeeperClusterTestCase {
@Test
public void testRepairedNotAdheringPlacementPolicyLedgerFragmentsOnRack()
throws Exception {
-
testRepairedNotAdheringPlacementPolicyLedgerFragments(RackawareEnsemblePlacementPolicy.class);
+
testRepairedNotAdheringPlacementPolicyLedgerFragments(RackawareEnsemblePlacementPolicy.class,
null);
+ }
+
+ @Test
+ public void testReplicationStats() throws Exception {
+ BiConsumer<Boolean, ReplicationWorker> checkReplicationStats = (first,
rw) -> {
+ try {
+ final Method rereplicate =
rw.getClass().getDeclaredMethod("rereplicate");
+ rereplicate.setAccessible(true);
+ final Object result = rereplicate.invoke(rw);
+ final Field statsLoggerField =
rw.getClass().getDeclaredField("statsLogger");
+ statsLoggerField.setAccessible(true);
+ final TestStatsLogger statsLogger = (TestStatsLogger)
statsLoggerField.get(rw);
+
+ final Counter numDeferLedgerLockReleaseOfFailedLedgerCounter =
+
statsLogger.getCounter(ReplicationStats.NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER);
+ final Counter numLedgersReplicatedCounter =
+
statsLogger.getCounter(ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED);
+
+ assertEquals("NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER",
+ 1,
numDeferLedgerLockReleaseOfFailedLedgerCounter.get().longValue());
+
+ if (first) {
+ assertFalse((boolean) result);
+ assertEquals("NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED",
+ 0, numLedgersReplicatedCounter.get().longValue());
+ } else {
+ assertTrue((boolean) result);
+ assertEquals("NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED",
+ 1, numLedgersReplicatedCounter.get().longValue());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+ testRepairedNotAdheringPlacementPolicyLedgerFragments(
+ RackawareEnsemblePlacementPolicy.class, checkReplicationStats);
}
private void testRepairedNotAdheringPlacementPolicyLedgerFragments(
- Class<? extends EnsemblePlacementPolicy> placementPolicyClass)
throws Exception {
+ Class<? extends EnsemblePlacementPolicy> placementPolicyClass,
+ BiConsumer<Boolean, ReplicationWorker> checkReplicationStats)
throws Exception {
List<BookieId> firstThreeBookies = servers.stream().map(ele -> {
try {
return ele.getServer().getBookieId();
@@ -1244,12 +1285,23 @@ public class TestReplicationWorker extends
BookKeeperClusterTestCase {
return ensemblePlacementPolicy;
}
};
- ReplicationWorker rw = new ReplicationWorker(baseConf, bookKeeper,
false, NullStatsLogger.INSTANCE);
- rw.start();
+ TestStatsProvider statsProvider = new TestStatsProvider();
+ TestStatsLogger statsLogger =
statsProvider.getStatsLogger(REPLICATION_SCOPE);
+ ReplicationWorker rw = new ReplicationWorker(baseConf, bookKeeper,
false, statsLogger);
+
+ if (checkReplicationStats != null) {
+ checkReplicationStats.accept(true, rw);
+ } else {
+ rw.start();
+ }
//start new bookie, the rack is /rack2
BookieId newBookieId = startNewBookieAndReturnBookieId();
+ if (checkReplicationStats != null) {
+ checkReplicationStats.accept(false, rw);
+ }
+
Awaitility.await().untilAsserted(() -> {
LedgerMetadata metadata =
bkc.getLedgerManager().readLedgerMetadata(lh.getId()).get().getValue();
List<BookieId> newBookies = metadata.getAllEnsembles().get(0L);
@@ -1267,6 +1319,10 @@ public class TestReplicationWorker extends
BookKeeperClusterTestCase {
}
verifyRecoveredLedgers(lh, 0, entrySize - 1);
+
+ if (checkReplicationStats == null) {
+ rw.shutdown();
+ }
}
private EnsemblePlacementPolicy
buildRackAwareEnsemblePlacementPolicy(List<BookieId> bookieIds) {