This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new f17b422  ISSUE #966: Expose quorum write complete latency to the client
f17b422 is described below

commit f17b422f2474e436ab7722dca16b778aa626f8ca
Author: JV <[email protected]>
AuthorDate: Sun Feb 18 00:44:32 2018 -0800

    ISSUE #966: Expose quorum write complete latency to the client
    
    Add quorum write complete latency to the bookkeeper client API. This
    allows callers to find out exactly how long the quorum write took for
    diagnostic purposes.
    
    (bug W-3616193)
    Signed-off-by: Venkateswararao Jujjuri (JV) <vjujjurisalesforce.com>
    [Fixed merge conflicts, added testing]
    Signed-off-by: Samuel Just <sjustsalesforce.com>
    
    Author: JV <[email protected]>
    
    Reviewers: Ivan Kelly <[email protected]>, Enrico Olivelli 
<[email protected]>, Sijie Guo <[email protected]>
    
    This closes #970 from athanatos/forupstream/stats1/qlatency, closes #966
---
 .../apache/bookkeeper/client/AsyncCallback.java    | 46 +++++++++++++++++++++-
 .../org/apache/bookkeeper/client/LedgerHandle.java | 42 ++++++++++++++++----
 .../apache/bookkeeper/client/LedgerHandleAdv.java  | 42 ++++++++++++++++----
 .../org/apache/bookkeeper/client/PendingAddOp.java | 11 ++++--
 ...BookieWriteLedgersWithDifferentDigestsTest.java |  9 +++--
 .../bookkeeper/test/BookKeeperClusterTestCase.java | 19 +++++++++
 6 files changed, 144 insertions(+), 25 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
index 7e19f44..cde3f06 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
@@ -26,6 +26,30 @@ import 
org.apache.bookkeeper.common.annotation.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public interface AsyncCallback {
+    /**
+     * Async Callback for adding entries to ledgers with latency information.
+     *
+     * @since 4.7
+     */
+    @InterfaceAudience.Public
+    @InterfaceStability.Evolving
+    interface AddCallbackWithLatency {
+        /**
+         * Callback declaration which additionally passes quorum write 
complete latency.
+         *
+         * @param rc
+         *          return code
+         * @param lh
+         *          ledger handle
+         * @param entryId
+         *          entry identifier
+         * @param qwcLatency
+         *          QuorumWriteComplete Latency
+         * @param ctx
+         *          context object
+         */
+        void addCompleteWithLatency(int rc, LedgerHandle lh, long entryId, 
long qwcLatency, Object ctx);
+    }
 
     /**
      * Async Callback for adding entries to ledgers.
@@ -34,9 +58,9 @@ public interface AsyncCallback {
      */
     @InterfaceAudience.Public
     @InterfaceStability.Stable
-    interface AddCallback {
+    interface AddCallback extends AddCallbackWithLatency {
         /**
-         * Callback declaration.
+         * Callback to implement if latency information is not desired.
          *
          * @param rc
          *          return code
@@ -48,6 +72,24 @@ public interface AsyncCallback {
          *          context object
          */
         void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
+
+        /**
+         * Callback declaration which additionally passes quorum write 
complete latency.
+         *
+         * @param rc
+         *          return code
+         * @param lh
+         *          ledger handle
+         * @param entryId
+         *          entry identifier
+         * @param qwcLatency
+         *          QuorumWriteComplete Latency
+         * @param ctx
+         *          context object
+         */
+        default void addCompleteWithLatency(int rc, LedgerHandle lh, long 
entryId, long qwcLatency, Object ctx) {
+            addComplete(rc, lh, entryId, ctx);
+        }
     }
 
     /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 007f4ce..7047b37 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
@@ -943,7 +944,7 @@ public class LedgerHandle implements WriteHandle {
      */
     public void asyncAddEntry(final long entryId, final byte[] data, final 
AddCallback cb, final Object ctx) {
         LOG.error("To use this feature Ledger must be created with 
createLedgerAdv() interface.");
-        cb.addComplete(BKException.Code.IllegalOpException, LedgerHandle.this, 
entryId, ctx);
+        cb.addCompleteWithLatency(BKException.Code.IllegalOpException, 
LedgerHandle.this, entryId, 0, ctx);
     }
 
     /**
@@ -1003,7 +1004,32 @@ public class LedgerHandle implements WriteHandle {
     public void asyncAddEntry(final long entryId, final byte[] data, final int 
offset, final int length,
             final AddCallback cb, final Object ctx) {
         LOG.error("To use this feature Ledger must be created with 
createLedgerAdv() interface.");
-        cb.addComplete(BKException.Code.IllegalOpException, LedgerHandle.this, 
entryId, ctx);
+        cb.addCompleteWithLatency(BKException.Code.IllegalOpException, 
LedgerHandle.this, entryId, 0, ctx);
+    }
+
+    /**
+     * Add entry asynchronously to an open ledger, using an offset and range.
+     *
+     * @param entryId
+     *            entryId of the entry to add
+     * @param data
+     *            array of bytes to be written
+     * @param offset
+     *            offset from which to take bytes from data
+     * @param length
+     *            number of bytes to take from data
+     * @param cb
+     *            object implementing callbackinterface
+     * @param ctx
+     *            some control object
+     * @throws ArrayIndexOutOfBoundsException
+     *             if offset or length is negative or offset and length sum to 
a
+     *             value higher than the length of data.
+     */
+    public void asyncAddEntry(final long entryId, final byte[] data, final int 
offset, final int length,
+                              final AddCallbackWithLatency cb, final Object 
ctx) {
+        LOG.error("To use this feature Ledger must be created with 
createLedgerAdv() interface.");
+        cb.addCompleteWithLatency(BKException.Code.IllegalOpException, 
LedgerHandle.this, entryId, 0, ctx);
     }
 
     /**
@@ -1050,8 +1076,8 @@ public class LedgerHandle implements WriteHandle {
                     @Override
                     public void safeRun() {
                         LOG.warn("Attempt to add to closed ledger: {}", 
ledgerId);
-                        
op.cb.addComplete(BKException.Code.LedgerClosedException,
-                                LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
+                        
op.cb.addCompleteWithLatency(BKException.Code.LedgerClosedException,
+                                LedgerHandle.this, INVALID_ENTRY_ID, 0, 
op.ctx);
                     }
 
                     @Override
@@ -1060,8 +1086,8 @@ public class LedgerHandle implements WriteHandle {
                     }
                 });
             } catch (RejectedExecutionException e) {
-                
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                        LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
+                
op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+                        LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
             }
             return;
         }
@@ -1069,8 +1095,8 @@ public class LedgerHandle implements WriteHandle {
         try {
             bk.getMainWorkerPool().submitOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
-            
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                    LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
+            
op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+                    LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 0866db7..8058103 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
 import org.apache.bookkeeper.client.api.WriteAdvHandle;
 import org.apache.bookkeeper.client.api.WriteFlag;
@@ -151,8 +152,33 @@ public class LedgerHandleAdv extends LedgerHandle 
implements WriteAdvHandle {
         asyncAddEntry(entryId, Unpooled.wrappedBuffer(data, offset, length), 
cb, ctx);
     }
 
+    /**
+     * Add entry asynchronously to an open ledger, using an offset and range.
+     *
+     * @param entryId
+     *            entryId of the entry to add
+     * @param data
+     *            array of bytes to be written
+     * @param offset
+     *            offset from which to take bytes from data
+     * @param length
+     *            number of bytes to take from data
+     * @param cb
+     *            object implementing callbackinterface
+     * @param ctx
+     *            some control object
+     * @throws ArrayIndexOutOfBoundsException
+     *             if offset or length is negative or offset and length sum to 
a
+     *             value higher than the length of data.
+     */
+    @Override
+    public void asyncAddEntry(final long entryId, final byte[] data, final int 
offset, final int length,
+                              final AddCallbackWithLatency cb, final Object 
ctx) {
+        asyncAddEntry(entryId, Unpooled.wrappedBuffer(data, offset, length), 
cb, ctx);
+    }
+
     private void asyncAddEntry(final long entryId, ByteBuf data,
-            final AddCallback cb, final Object ctx) {
+            final AddCallbackWithLatency cb, final Object ctx) {
         PendingAddOp op = PendingAddOp.create(this, data, cb, ctx);
         op.setEntryId(entryId);
 
@@ -196,8 +222,8 @@ public class LedgerHandleAdv extends LedgerHandle 
implements WriteAdvHandle {
                     @Override
                     public void safeRun() {
                         LOG.warn("Attempt to add to closed ledger: {}", 
ledgerId);
-                        
op.cb.addComplete(BKException.Code.LedgerClosedException,
-                                LedgerHandleAdv.this, op.getEntryId(), op.ctx);
+                        
op.cb.addCompleteWithLatency(BKException.Code.LedgerClosedException,
+                                LedgerHandleAdv.this, op.getEntryId(), 0, 
op.ctx);
                     }
                     @Override
                     public String toString() {
@@ -205,8 +231,8 @@ public class LedgerHandleAdv extends LedgerHandle 
implements WriteAdvHandle {
                     }
                 });
             } catch (RejectedExecutionException e) {
-                
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                        LedgerHandleAdv.this, op.getEntryId(), op.ctx);
+                
op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+                        LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx);
             }
             return;
         }
@@ -214,8 +240,8 @@ public class LedgerHandleAdv extends LedgerHandle 
implements WriteAdvHandle {
         try {
             bk.getMainWorkerPool().submitOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
-            
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                              LedgerHandleAdv.this, op.getEntryId(), op.ctx);
+            
op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+                              LedgerHandleAdv.this, op.getEntryId(), 0, 
op.ctx);
         }
     }
 
@@ -231,7 +257,7 @@ public class LedgerHandleAdv extends LedgerHandle 
implements WriteAdvHandle {
      */
     @Override
     public void asyncAddEntry(ByteBuf data, AddCallback cb, Object ctx) {
-        cb.addComplete(BKException.Code.IllegalOpException, this, 
LedgerHandle.INVALID_ENTRY_ID, ctx);
+        cb.addCompleteWithLatency(BKException.Code.IllegalOpException, this, 
LedgerHandle.INVALID_ENTRY_ID, 0, ctx);
     }
 
     /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index e386701..8d65c00 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -30,7 +30,7 @@ import java.util.Map;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -54,7 +54,7 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback {
 
     ByteBuf payload;
     ByteBuf toSend;
-    AddCallback cb;
+    AddCallbackWithLatency cb;
     Object ctx;
     long entryId;
     int entryLength;
@@ -65,6 +65,7 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback {
     LedgerHandle lh;
     boolean isRecoveryAdd = false;
     long requestTimeNanos;
+    long qwcLatency; // Quorum Write Completion Latency after response from 
quorum bookies.
 
     long timeoutNanos;
 
@@ -74,7 +75,7 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback {
     boolean callbackTriggered;
     boolean hasRun;
 
-    static PendingAddOp create(LedgerHandle lh, ByteBuf payload, AddCallback 
cb, Object ctx) {
+    static PendingAddOp create(LedgerHandle lh, ByteBuf payload, 
AddCallbackWithLatency cb, Object ctx) {
         PendingAddOp op = RECYCLER.get();
         op.lh = lh;
         op.isRecoveryAdd = false;
@@ -93,6 +94,7 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback {
         op.callbackTriggered = false;
         op.hasRun = false;
         op.requestTimeNanos = Long.MAX_VALUE;
+        op.qwcLatency = 0;
         return op;
     }
 
@@ -322,6 +324,7 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback {
 
         if (ackQuorum && !completed) {
             completed = true;
+            this.qwcLatency = MathUtils.elapsedNanos(requestTimeNanos);
 
             sendAddSuccessCallbacks();
         }
@@ -344,7 +347,7 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback {
         } else {
             addOpLogger.registerSuccessfulEvent(latencyNanos, 
TimeUnit.NANOSECONDS);
         }
-        cb.addComplete(rc, lh, entryId, ctx);
+        cb.addCompleteWithLatency(rc, lh, entryId, qwcLatency, ctx);
         callbackTriggered = true;
 
         maybeRecycle();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgersWithDifferentDigestsTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgersWithDifferentDigestsTest.java
index daa434c..9ab461a 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgersWithDifferentDigestsTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgersWithDifferentDigestsTest.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.bookie.BookieException.Code.OK;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -31,7 +32,6 @@ import java.util.Collection;
 import java.util.Enumeration;
 import java.util.Random;
 
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Before;
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
  */
 @RunWith(Parameterized.class)
 public class BookieWriteLedgersWithDifferentDigestsTest extends
-    BookKeeperClusterTestCase implements AddCallback {
+    BookKeeperClusterTestCase implements AsyncCallback.AddCallbackWithLatency {
 
     private static final Logger LOG = LoggerFactory
             .getLogger(BookieWriteLedgersWithDifferentDigestsTest.class);
@@ -192,8 +192,11 @@ public class BookieWriteLedgersWithDifferentDigestsTest 
extends
     }
 
     @Override
-    public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) 
{
+    public void addCompleteWithLatency(int rc, LedgerHandle lh, long entryId, 
long qwcLatency, Object ctx) {
         SyncObj x = (SyncObj) ctx;
+        captureThrowable(() -> {
+            assertTrue("Successful write should have non-zero latency", rc != 
OK || qwcLatency > 0);
+        });
         synchronized (x) {
             x.rc = rc;
             x.counter++;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 6365c15..b965d15 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -21,6 +21,8 @@
 
 package org.apache.bookkeeper.test;
 
+import static org.junit.Assert.assertTrue;
+
 import com.google.common.base.Stopwatch;
 import java.io.File;
 import java.io.IOException;
@@ -32,6 +34,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.Bookie;
@@ -97,6 +100,16 @@ public abstract class BookKeeperClusterTestCase {
 
     private boolean isAutoRecoveryEnabled;
 
+    SynchronousQueue<Throwable> asyncExceptions = new SynchronousQueue<>();
+    protected void captureThrowable(Runnable c) {
+        try {
+            c.run();
+        } catch (Throwable e) {
+            LOG.error("Captured error: {}", e);
+            asyncExceptions.add(e);
+        }
+    }
+
     public BookKeeperClusterTestCase(int numBookies) {
         this(numBookies, 120);
     }
@@ -128,6 +141,12 @@ public abstract class BookKeeperClusterTestCase {
 
     @After
     public void tearDown() throws Exception {
+        boolean failed = false;
+        for (Throwable e : asyncExceptions) {
+            LOG.error("Got async exception: {}", e);
+            failed = true;
+        }
+        assertTrue("Async failure", !failed);
         Stopwatch sw = Stopwatch.createStarted();
         LOG.info("TearDown");
         Exception tearDownException = null;

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to