This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 5203d03  Issue #1152: stats for durability violations in write/read 
path
5203d03 is described below

commit 5203d03930356a322f301e8e72a052ce1972adaa
Author: JV Jujjuri <[email protected]>
AuthorDate: Sun Feb 18 00:53:36 2018 -0800

    Issue #1152: stats for durability violations in write/read path
    
    Descriptions of the changes in this PR:
    
    Durability contract demands that there are WQ copies spread
    across fault zones as per the ensemble placement policy.
    There are lots of areas where this contract is violated.
    
    Add stats to track two areas that are well:
    a. Write Path: if the write fails after satisfying AQ
    b. Read Path: If the checksum error is detected on the read path
    
    Signed-off-by: Venkateswararao Jujjuri (JV) <vjujjurisalesforce.com>
    
    Master Issue: #1152
    
    Author: JV Jujjuri <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #1155 from jvrao/bk-ssue-1152-stats-1, closes #1152
---
 .../org/apache/bookkeeper/client/BookKeeper.java   |  12 +-
 .../bookkeeper/client/BookKeeperClientStats.java   |   3 +
 .../org/apache/bookkeeper/client/PendingAddOp.java |   9 ++
 .../apache/bookkeeper/client/PendingReadOp.java    |   3 +
 .../bookkeeper/client/BookieWriteLedgerTest.java   | 134 ++++++++++++++++++++-
 5 files changed, 159 insertions(+), 2 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 4c50697..9eb9da9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -120,6 +120,9 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
     private OpStatsLogger recoverReadEntriesStats;
 
     private Counter speculativeReadCounter;
+    private Counter readOpDmCounter;
+    private Counter addOpUrCounter;
+
 
     // whether the event loop group is one we created, or is owned by whoever
     // instantiated us
@@ -1451,10 +1454,12 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
         openOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.OPEN_OP);
         recoverOpLogger = 
stats.getOpStatsLogger(BookKeeperClientStats.RECOVER_OP);
         readOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_OP);
+        readOpDmCounter = stats.getCounter(BookKeeperClientStats.READ_OP_DM);
         readLacAndEntryOpLogger = 
stats.getOpStatsLogger(BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY);
         readLacAndEntryRespLogger = stats.getOpStatsLogger(
                 BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE);
         addOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.ADD_OP);
+        addOpUrCounter = stats.getCounter(BookKeeperClientStats.ADD_OP_UR);
         writeLacOpLogger = 
stats.getOpStatsLogger(BookKeeperClientStats.WRITE_LAC_OP);
         readLacOpLogger = 
stats.getOpStatsLogger(BookKeeperClientStats.READ_LAC_OP);
         recoverAddEntriesStats = 
stats.getOpStatsLogger(BookKeeperClientStats.LEDGER_RECOVER_ADD_ENTRIES);
@@ -1499,7 +1504,12 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
     OpStatsLogger getRecoverReadCountLogger() {
         return recoverReadEntriesStats;
     }
-
+    Counter getReadOpDmCounter() {
+        return readOpDmCounter;
+    }
+    Counter getAddOpUrCounter() {
+        return addOpUrCounter;
+    }
     static EventLoopGroup getDefaultEventLoopGroup() {
         ThreadFactory threadFactory = new 
DefaultThreadFactory("bookkeeper-io");
         final int numThreads = Runtime.getRuntime().availableProcessors() * 2;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 60058e3..e3d8438 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -40,7 +40,10 @@ public interface BookKeeperClientStats {
     // Data Operations
 
     String ADD_OP = "ADD_ENTRY";
+    String ADD_OP_UR = "ADD_ENTRY_UR"; // Under Replicated during AddEntry.
     String READ_OP = "READ_ENTRY";
+    // Corrupted entry (Digest Mismatch/ Under Replication) detected during 
ReadEntry
+    String READ_OP_DM = "READ_ENTRY_DM";
     String WRITE_LAC_OP = "WRITE_LAC";
     String READ_LAC_OP = "READ_LAC";
     String READ_LAST_CONFIRMED_AND_ENTRY = "READ_LAST_CONFIRMED_AND_ENTRY";
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 8d65c00..639d042 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -34,6 +34,7 @@ import 
org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
@@ -70,6 +71,7 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback {
     long timeoutNanos;
 
     OpStatsLogger addOpLogger;
+    Counter addOpUrCounter;
     long currentLedgerLength;
     int pendingWriteRequests;
     boolean callbackTriggered;
@@ -89,6 +91,7 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback {
         op.completed = false;
         op.ackSet = lh.distributionSchedule.getAckSet();
         op.addOpLogger = lh.bk.getAddOpLogger();
+        op.addOpUrCounter = lh.bk.getAddOpUrCounter();
         op.timeoutNanos = lh.bk.addEntryQuorumTimeoutNanos;
         op.pendingWriteRequests = 0;
         op.callbackTriggered = false;
@@ -258,6 +261,11 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback {
         }
 
         if (completed) {
+            if (rc != BKException.Code.OK) {
+                // Got an error after satisfying AQ. This means we are under 
replicated at the create itself.
+                // Update the stat to reflect it.
+                addOpUrCounter.inc();
+            }
             // even the add operation is completed, but because we don't reset 
completed flag back to false when
             // #unsetSuccessAndSendWriteRequest doesn't break ack quorum 
constraint. we still have current pending
             // add op is completed but never callback. so do a check here to 
complete again.
@@ -427,6 +435,7 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback {
         lh = null;
         isRecoveryAdd = false;
         addOpLogger = null;
+        addOpUrCounter = null;
         completed = false;
         pendingWriteRequests = 0;
         callbackTriggered = false;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 58ee31d..0276ef4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -72,6 +72,7 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
     long endEntryId;
     long requestTimeNanos;
     OpStatsLogger readOpLogger;
+    Counter readOpDmCounter;
     private final Counter speculativeReadCounter;
 
     final int requiredBookiesMissingEntryForRecovery;
@@ -134,6 +135,7 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
             try {
                 content = 
lh.macManager.verifyDigestAndReturnData(entryImpl.getEntryId(), buffer);
             } catch (BKDigestMatchException e) {
+                readOpDmCounter.inc();
                 logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", 
BKException.Code.DigestMatchException);
                 buffer.release();
                 return false;
@@ -478,6 +480,7 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
         heardFromHostsBitSet = new 
BitSet(getLedgerMetadata().getEnsembleSize());
 
         readOpLogger = lh.bk.getReadOpLogger();
+        readOpDmCounter = lh.bk.getReadOpDmCounter();
         speculativeReadCounter = lh.bk.getSpeculativeReadCounter();
     }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 5135d8e..262f0c8 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -20,12 +20,17 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.client.BookKeeperClientStats.ADD_OP;
+import static org.apache.bookkeeper.client.BookKeeperClientStats.ADD_OP_UR;
+import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE;
+import static org.apache.bookkeeper.client.BookKeeperClientStats.READ_OP_DM;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Enumeration;
@@ -38,12 +43,16 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException.BKLedgerClosedException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -144,8 +153,107 @@ public class BookieWriteLedgerTest extends
             entries1.add(entry.array());
             lh.addEntry(entry.array());
         }
+        readEntries(lh, entries1);
+        lh.close();
+    }
+
+    /**
+     * Verify write and Read durability stats.
+     */
+    @Test
+    public void testWriteAndReadStats() throws Exception {
+        // Create a ledger
+        lh = bkc.createLedger(3, 3, 2, digestType, ledgerPassword);
+
+        // write-batch-1
+        for (int i = 0; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+
+            entries1.add(entry.array());
+            lh.addEntry(entry.array());
+        }
+        assertTrue(
+                "Stats should have captured a new writes",
+                bkc.getTestStatsProvider().getOpStatsLogger(
+                        CLIENT_SCOPE + "." + ADD_OP)
+                        .getSuccessCount() > 0);
+
+        CountDownLatch sleepLatch1 = new CountDownLatch(1);
+        CountDownLatch sleepLatch2 = new CountDownLatch(1);
+        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
+                .getEnsembles().entrySet().iterator().next().getValue();
+
+        sleepBookie(ensemble.get(0), sleepLatch1);
+
+        int i = numEntriesToWrite;
+        numEntriesToWrite = numEntriesToWrite + 50;
+
+        // write-batch-2
+
+        for (; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+
+            entries1.add(entry.array());
+            lh.addEntry(entry.array());
+        }
+
+        // Let the second bookie go to sleep. This forces write timeout and 
ensemble change
+        // Which will be enough time to receive delayed write failures on the 
write-batch-2
+
+        sleepBookie(ensemble.get(1), sleepLatch2);
+        i = numEntriesToWrite;
+        numEntriesToWrite = numEntriesToWrite + 50;
+
+        // write-batch-3
+
+        for (; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+
+            entries1.add(entry.array());
+            lh.addEntry(entry.array());
+        }
+
+        assertTrue(
+                "Stats should have captured a new UnderReplication during 
write",
+                bkc.getTestStatsProvider().getCounter(
+                        CLIENT_SCOPE + "." + ADD_OP_UR)
+                        .get() > 0);
+
+        sleepLatch1.countDown();
+        sleepLatch2.countDown();
+
+        // Replace the bookie with a fake bookie
+        ServerConfiguration conf = killBookie(ensemble.get(0));
+        BookieWriteLedgerTest.CorruptReadBookie corruptBookie = new 
BookieWriteLedgerTest.CorruptReadBookie(conf);
+        bs.add(startBookie(conf, corruptBookie));
+        bsConfs.add(conf);
+
+        i = numEntriesToWrite;
+        numEntriesToWrite = numEntriesToWrite + 50;
+
+        // write-batch-4
+
+        for (; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+
+            entries1.add(entry.array());
+            lh.addEntry(entry.array());
+        }
 
         readEntries(lh, entries1);
+        assertTrue(
+                "Stats should have captured DigestMismatch on Read",
+                bkc.getTestStatsProvider().getCounter(
+                        CLIENT_SCOPE + "." + READ_OP_DM)
+                        .get() > 0);
         lh.close();
     }
 
@@ -1020,4 +1128,28 @@ public class BookieWriteLedgerTest extends
             x.notify();
         }
     }
+
+    static class CorruptReadBookie extends Bookie {
+
+        static final Logger LOG = 
LoggerFactory.getLogger(CorruptReadBookie.class);
+        ByteBuf localBuf;
+
+        public CorruptReadBookie(ServerConfiguration conf)
+                throws IOException, KeeperException, InterruptedException, 
BookieException {
+            super(conf);
+        }
+
+        @Override
+        public ByteBuf readEntry(long ledgerId, long entryId) throws 
IOException, NoLedgerException {
+            localBuf = super.readEntry(ledgerId, entryId);
+
+            int capacity = 0;
+            while (capacity < localBuf.capacity()) {
+                localBuf.setByte(capacity, 0);
+                capacity++;
+            }
+            return localBuf;
+        }
+
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to