Repository: kudu
Updated Branches:
  refs/heads/master 71f21f0ab -> 7bcfcf64c

[java client] AsyncKuduClient#delayedSendRpcToTablet should return a Deferred

There's some confusion in the Java client code regarding where Deferred objects
are returned and also sent via an RPC's callback. Such a place is 
since it's called from both the context of paths that come from the user
application (need a Deferred) and paths that simply handle background retries 
need a Deferred since we rely on KuduRpc#callback).

This problem manifested itself in a unit test:
02:15:40.300 [DEBUG - main] ( Cannot continue with 
this RPC: Batch{operations=1, tablet="4114911c600b4bc3bbca228f1880568c" 
[<start>, <end>), ignoreAllDuplicateRows=false} because of: RPC can not 
complete before timeout:
org.apache.kudu.client.NonRecoverableException: RPC can not complete before 
timeout: Batch{operations=1, tablet="4114911c600b4bc3bbca228f1880568c" 
[<start>, <end>), ignoreAllDuplicateRows=false}
        at com.stumbleupon.async.Deferred.doCall(
        at com.stumbleupon.async.Deferred.addCallbacks(
        at com.stumbleupon.async.Deferred.addBoth(
        at org.apache.kudu.util.AsyncUtil.addBoth(

Here the Deferred.fromError that got created in tooManyAttemptsOrTimeout was 
ignored by delayedSendRpcToTablet, then
in sendRpcToTablet we sent back the RPC's Deferred which at that point was 
voided since tooManyAttemptsOrTimeout
_also_ called that RPC's callback.

I tried writing a test, but even with looping on my machine I can't repro the 

Change-Id: I7c5e87b7dc3366ed9d72121a2421c5cd2304608b
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <>


Branch: refs/heads/master
Commit: 7bcfcf64ccc2300f482e11e73d9703b92093394b
Parents: 71f21f0
Author: Jean-Daniel Cryans <>
Authored: Thu Oct 6 15:57:28 2016 -0700
Committer: Adar Dembo <>
Committed: Wed Oct 12 18:05:40 2016 +0000

 .../org/apache/kudu/client/ | 28 +++++++++++++-------
 1 file changed, 18 insertions(+), 10 deletions(-)
diff --git 
index cedd111..84e60de 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/
@@ -654,8 +654,7 @@ public class AsyncKuduClient implements AutoCloseable {
       // A null client means we either don't know about this tablet anymore 
(unlikely) or we
       // couldn't find a leader (which could be triggered by a read timeout).
       // We'll first delay the RPC in case things take some time to settle 
down, then retry.
-      delayedSendRpcToTablet(next_request, null);
-      return next_request.getDeferred();
+      return delayedSendRpcToTablet(next_request, null);
     return d;
@@ -753,8 +752,7 @@ public class AsyncKuduClient implements AutoCloseable {
         if (newTabletClient == null) {
           // Wait a little bit before hitting the master.
-          delayedSendRpcToTablet(request, null);
-          return request.getDeferred();
+          return delayedSendRpcToTablet(request, null);
         if (!newTabletClient.isAlive()) {
@@ -837,9 +835,7 @@ public class AsyncKuduClient implements AutoCloseable {
     public Deferred<R> call(Exception arg) {
       if (arg instanceof RecoverableException) {
-        Deferred<R> d = request.getDeferred();
-        delayedSendRpcToTablet(request, (KuduException) arg);
-        return d;
+        return delayedSendRpcToTablet(request, (KuduException) arg);
       if (LOG.isDebugEnabled()) {
         LOG.debug(String.format("Notify RPC %s after lookup exception", 
request), arg);
@@ -1257,10 +1253,22 @@ public class AsyncKuduClient implements AutoCloseable {
   <R> void handleRetryableError(final KuduRpc<R> rpc, KuduException ex) {
     // TODO we don't always need to sleep, maybe another replica can serve 
this RPC.
+    // We don't care about the returned Deferred in this case, since we're not 
in a context where
+    // we're eventually returning a Deferred.
     delayedSendRpcToTablet(rpc, ex);
-  private <R> void delayedSendRpcToTablet(final KuduRpc<R> rpc, KuduException 
ex) {
+  /**
+   * This methods enable putting RPCs on hold for a period of time determined 
+   * {@link #getSleepTimeForRpc(KuduRpc)}. If the RPC is out of time/retries, 
its errback will
+   * be immediately called.
+   * @param rpc the RPC to retry later
+   * @param ex the reason why we need to retry, might be null
+   * @return a Deferred object to use if this method is called inline with the 
user's original
+   * attempt to send the RPC. Can be ignored in any other context that doesn't 
need to return a
+   * Deferred back to the user.
+   */
+  private <R> Deferred<R> delayedSendRpcToTablet(final KuduRpc<R> rpc, 
KuduException ex) {
     // Here we simply retry the RPC later. We might be doing this along with a 
lot of other RPCs
     // in parallel. Asynchbase does some hacking with a "probe" RPC while 
putting the other ones
     // on hold but we won't be doing this for the moment. Regions in HBase can 
move a lot,
@@ -1272,11 +1280,11 @@ public class AsyncKuduClient implements AutoCloseable {
     long sleepTime = getSleepTimeForRpc(rpc);
     if (cannotRetryRequest(rpc) || 
rpc.deadlineTracker.wouldSleepingTimeout(sleepTime)) {
-      tooManyAttemptsOrTimeout(rpc, ex);
       // Don't let it retry.
-      return;
+      return tooManyAttemptsOrTimeout(rpc, ex);
     newTimeout(new RetryTimer(), sleepTime);
+    return rpc.getDeferred();

Reply via email to