Repository: kudu Updated Branches: refs/heads/master 71f21f0ab -> 7bcfcf64c
[java client] AsyncKuduClient#delayedSendRpcToTablet should return a Deferred There's some confusion in the Java client code regarding where Deferred objects are returned and also sent via an RPC's callback. Such a place is delayedSendRpcToTablet, since it's called from both the context of paths that come from the user application (need a Deferred) and paths that simply handle background retries (don't need a Deferred since we rely on KuduRpc#callback). This problem manifested itself in a unit test: 02:15:40.300 [DEBUG - main] (AsyncKuduClient.java:1065) Cannot continue with this RPC: Batch{operations=1, tablet="4114911c600b4bc3bbca228f1880568c" [<start>, <end>), ignoreAllDuplicateRows=false} because of: RPC can not complete before timeout: org.apache.kudu.client.NonRecoverableException: RPC can not complete before timeout: Batch{operations=1, tablet="4114911c600b4bc3bbca228f1880568c" [<start>, <end>), ignoreAllDuplicateRows=false} at org.apache.kudu.client.AsyncKuduClient.tooManyAttemptsOrTimeout(AsyncKuduClient.java:1063) at org.apache.kudu.client.AsyncKuduClient.delayedSendRpcToTablet(AsyncKuduClient.java:1275) at org.apache.kudu.client.AsyncKuduClient.sendRpcToTablet(AsyncKuduClient.java:756) at org.apache.kudu.client.AsyncKuduSession$TabletLookupCB.call(AsyncKuduSession.java:371) at org.apache.kudu.client.AsyncKuduSession$TabletLookupCB.call(AsyncKuduSession.java:302) at com.stumbleupon.async.Deferred.doCall(Deferred.java:1280) at com.stumbleupon.async.Deferred.addCallbacks(Deferred.java:685) at com.stumbleupon.async.Deferred.addBoth(Deferred.java:769) at org.apache.kudu.util.AsyncUtil.addBoth(AsyncUtil.java:59) at org.apache.kudu.client.AsyncKuduSession.doFlush(AsyncKuduSession.java:436) at org.apache.kudu.client.AsyncKuduSession.flush(AsyncKuduSession.java:405) at org.apache.kudu.client.TestAsyncKuduSession.testInsertIntoUnavailableTablet(TestAsyncKuduSession.java:164) Here the Deferred.fromError that got created in tooManyAttemptsOrTimeout was ignored by delayedSendRpcToTablet, then in sendRpcToTablet we sent back the RPC's Deferred which at that point was voided since tooManyAttemptsOrTimeout _also_ called that RPC's callback. I tried writing a test, but even with looping on my machine I can't repro the issue. Change-Id: I7c5e87b7dc3366ed9d72121a2421c5cd2304608b Reviewed-on: http://gerrit.cloudera.org:8080/4654 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <a...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7bcfcf64 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7bcfcf64 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7bcfcf64 Branch: refs/heads/master Commit: 7bcfcf64ccc2300f482e11e73d9703b92093394b Parents: 71f21f0 Author: Jean-Daniel Cryans <jdcry...@apache.org> Authored: Thu Oct 6 15:57:28 2016 -0700 Committer: Adar Dembo <a...@cloudera.com> Committed: Wed Oct 12 18:05:40 2016 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/client/AsyncKuduClient.java | 28 +++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/7bcfcf64/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index cedd111..84e60de 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -654,8 +654,7 @@ public class AsyncKuduClient implements AutoCloseable { // A null client means we either don't know about this tablet anymore (unlikely) or we // couldn't find a leader (which could be triggered by a read timeout). // We'll first delay the RPC in case things take some time to settle down, then retry. - delayedSendRpcToTablet(next_request, null); - return next_request.getDeferred(); + return delayedSendRpcToTablet(next_request, null); } client.sendRpc(next_request); return d; @@ -753,8 +752,7 @@ public class AsyncKuduClient implements AutoCloseable { if (newTabletClient == null) { // Wait a little bit before hitting the master. - delayedSendRpcToTablet(request, null); - return request.getDeferred(); + return delayedSendRpcToTablet(request, null); } if (!newTabletClient.isAlive()) { @@ -837,9 +835,7 @@ public class AsyncKuduClient implements AutoCloseable { @Override public Deferred<R> call(Exception arg) { if (arg instanceof RecoverableException) { - Deferred<R> d = request.getDeferred(); - delayedSendRpcToTablet(request, (KuduException) arg); - return d; + return delayedSendRpcToTablet(request, (KuduException) arg); } if (LOG.isDebugEnabled()) { LOG.debug(String.format("Notify RPC %s after lookup exception", request), arg); @@ -1257,10 +1253,22 @@ public class AsyncKuduClient implements AutoCloseable { <R> void handleRetryableError(final KuduRpc<R> rpc, KuduException ex) { // TODO we don't always need to sleep, maybe another replica can serve this RPC. + // We don't care about the returned Deferred in this case, since we're not in a context where + // we're eventually returning a Deferred. delayedSendRpcToTablet(rpc, ex); } - private <R> void delayedSendRpcToTablet(final KuduRpc<R> rpc, KuduException ex) { + /** + * This methods enable putting RPCs on hold for a period of time determined by + * {@link #getSleepTimeForRpc(KuduRpc)}. If the RPC is out of time/retries, its errback will + * be immediately called. + * @param rpc the RPC to retry later + * @param ex the reason why we need to retry, might be null + * @return a Deferred object to use if this method is called inline with the user's original + * attempt to send the RPC. Can be ignored in any other context that doesn't need to return a + * Deferred back to the user. + */ + private <R> Deferred<R> delayedSendRpcToTablet(final KuduRpc<R> rpc, KuduException ex) { // Here we simply retry the RPC later. We might be doing this along with a lot of other RPCs // in parallel. Asynchbase does some hacking with a "probe" RPC while putting the other ones // on hold but we won't be doing this for the moment. Regions in HBase can move a lot, @@ -1272,11 +1280,11 @@ public class AsyncKuduClient implements AutoCloseable { } long sleepTime = getSleepTimeForRpc(rpc); if (cannotRetryRequest(rpc) || rpc.deadlineTracker.wouldSleepingTimeout(sleepTime)) { - tooManyAttemptsOrTimeout(rpc, ex); // Don't let it retry. - return; + return tooManyAttemptsOrTimeout(rpc, ex); } newTimeout(new RetryTimer(), sleepTime); + return rpc.getDeferred(); } /**