This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 6489a7d419 Fix numLedgersReplicated metric (#3654)
6489a7d419 is described below

commit 6489a7d4193ca96379578a8c82b14e780154c6ef
Author: wenbingshen <[email protected]>
AuthorDate: Wed Feb 15 15:13:39 2023 +0800

    Fix numLedgersReplicated metric (#3654)
    
    ### Motivation
    
    When I want to add num not adhering placement ledgers replicated metric for 
RW(#3652), I found the numLedgersReplicated statistic to be inaccurate.
    
    When we could not find a target bookie to replicate the ledger, the 
numLedgersReplicated metric will still increase.
    
![image](https://user-images.githubusercontent.com/35599757/202638656-58f974f6-9ffc-4cfe-9ac7-85962f941de7.png)
---
 .../apache/bookkeeper/client/BookKeeperAdmin.java  |  2 +-
 .../replication/TestReplicationWorker.java         | 64 ++++++++++++++++++++--
 2 files changed, 61 insertions(+), 5 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 42178156fa..f9ac460bde 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -1145,7 +1145,7 @@ public class BookKeeperAdmin implements AutoCloseable {
         if (MapUtils.isEmpty(targetBookieAddresses)) {
             LOG.warn("Could not replicate for {} ledger: {}, not find target 
bookie.",
                     ledgerFragment.getReplicateType(), 
ledgerFragment.getLedgerId());
-            return;
+            throw new BKException.BKLedgerRecoveryException();
         }
         replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses, 
onReadEntryFailureCallback);
     }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index 9982dd506a..88e05b995b 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.replication;
 
 import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION;
+import static 
org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_SCOPE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -31,6 +32,8 @@ import static org.junit.Assert.fail;
 
 import io.netty.util.HashedWheelTimer;
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.util.Enumeration;
@@ -44,6 +47,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
 import org.apache.bookkeeper.bookie.BookieImpl;
@@ -1156,11 +1160,48 @@ public class TestReplicationWorker extends 
BookKeeperClusterTestCase {
 
     @Test
     public void testRepairedNotAdheringPlacementPolicyLedgerFragmentsOnRack() 
throws Exception {
-        
testRepairedNotAdheringPlacementPolicyLedgerFragments(RackawareEnsemblePlacementPolicy.class);
+        
testRepairedNotAdheringPlacementPolicyLedgerFragments(RackawareEnsemblePlacementPolicy.class,
 null);
+    }
+
+    @Test
+    public void testReplicationStats() throws Exception {
+        BiConsumer<Boolean, ReplicationWorker> checkReplicationStats = (first, 
rw) -> {
+            try {
+                final Method rereplicate = 
rw.getClass().getDeclaredMethod("rereplicate");
+                rereplicate.setAccessible(true);
+                final Object result = rereplicate.invoke(rw);
+                final Field statsLoggerField = 
rw.getClass().getDeclaredField("statsLogger");
+                statsLoggerField.setAccessible(true);
+                final TestStatsLogger statsLogger = (TestStatsLogger) 
statsLoggerField.get(rw);
+
+                final Counter numDeferLedgerLockReleaseOfFailedLedgerCounter =
+                        
statsLogger.getCounter(ReplicationStats.NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER);
+                final Counter numLedgersReplicatedCounter =
+                        
statsLogger.getCounter(ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED);
+
+                assertEquals("NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER",
+                        1, 
numDeferLedgerLockReleaseOfFailedLedgerCounter.get().longValue());
+
+                if (first) {
+                    assertFalse((boolean) result);
+                    assertEquals("NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED",
+                            0, numLedgersReplicatedCounter.get().longValue());
+                } else {
+                    assertTrue((boolean) result);
+                    assertEquals("NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED",
+                            1, numLedgersReplicatedCounter.get().longValue());
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        };
+        testRepairedNotAdheringPlacementPolicyLedgerFragments(
+                RackawareEnsemblePlacementPolicy.class, checkReplicationStats);
     }
 
     private void testRepairedNotAdheringPlacementPolicyLedgerFragments(
-            Class<? extends EnsemblePlacementPolicy> placementPolicyClass) 
throws Exception {
+            Class<? extends EnsemblePlacementPolicy> placementPolicyClass,
+            BiConsumer<Boolean, ReplicationWorker> checkReplicationStats) 
throws Exception {
         List<BookieId> firstThreeBookies = servers.stream().map(ele -> {
             try {
                 return ele.getServer().getBookieId();
@@ -1244,12 +1285,23 @@ public class TestReplicationWorker extends 
BookKeeperClusterTestCase {
                 return ensemblePlacementPolicy;
             }
         };
-        ReplicationWorker rw = new ReplicationWorker(baseConf, bookKeeper, 
false, NullStatsLogger.INSTANCE);
-        rw.start();
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = 
statsProvider.getStatsLogger(REPLICATION_SCOPE);
+        ReplicationWorker rw = new ReplicationWorker(baseConf, bookKeeper, 
false, statsLogger);
+
+        if (checkReplicationStats != null) {
+            checkReplicationStats.accept(true, rw);
+        } else {
+            rw.start();
+        }
 
         //start new bookie, the rack is /rack2
         BookieId newBookieId = startNewBookieAndReturnBookieId();
 
+        if (checkReplicationStats != null) {
+            checkReplicationStats.accept(false, rw);
+        }
+
         Awaitility.await().untilAsserted(() -> {
             LedgerMetadata metadata = 
bkc.getLedgerManager().readLedgerMetadata(lh.getId()).get().getValue();
             List<BookieId> newBookies = metadata.getAllEnsembles().get(0L);
@@ -1267,6 +1319,10 @@ public class TestReplicationWorker extends 
BookKeeperClusterTestCase {
         }
 
         verifyRecoveredLedgers(lh, 0, entrySize - 1);
+
+        if (checkReplicationStats == null) {
+            rw.shutdown();
+        }
     }
 
     private EnsemblePlacementPolicy 
buildRackAwareEnsemblePlacementPolicy(List<BookieId> bookieIds) {

Reply via email to