Repository: kudu Updated Branches: refs/heads/master f1da1eb45 -> 7f2624ae4
KUDU-1906. Fix lost callback for scanner path Fixes another case similar to KUDU-1888 in which we were sending an RPC before setting its deferred. In the case that the RPC responded very quickly, the response would come before the callback was attached, and the callback would never get called. This caused my RowCounter jobs on a small/underpowered test cluster to have task timeouts a few percent of the time. This patch fixes the particular instance and also adds some assertions to try to prevent this style of bug in the future. Change-Id: I102778e87d0f153cdd2a1ca2aed3ec1e17014d4b Reviewed-on: http://gerrit.cloudera.org:8080/6239 Tested-by: Kudu Jenkins Reviewed-by: Jean-Daniel Cryans <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7f2624ae Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7f2624ae Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7f2624ae Branch: refs/heads/master Commit: 7f2624ae4e0132aee24f6b7b2af31e2219ac31f0 Parents: f1da1eb Author: Todd Lipcon <[email protected]> Authored: Thu Mar 2 19:13:57 2017 -0800 Committer: Jean-Daniel Cryans <[email protected]> Committed: Fri Mar 3 15:44:54 2017 +0000 ---------------------------------------------------------------------- .../src/main/java/org/apache/kudu/client/AsyncKuduClient.java | 3 ++- .../src/main/java/org/apache/kudu/client/KuduRpc.java | 5 +++++ .../src/main/java/org/apache/kudu/client/TabletClient.java | 1 + .../test/java/org/apache/kudu/client/TestAsyncKuduSession.java | 3 ++- 4 files changed, 10 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/7f2624ae/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index 6639284..f1c8adc 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -683,8 +683,9 @@ public class AsyncKuduClient implements AutoCloseable { " will retry after a delay"); return delayedSendRpcToTablet(nextRequest, new RecoverableException(statusRemoteError)); } + Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred(); client.sendRpc(nextRequest); - return nextRequest.getDeferred(); + return d; } /** http://git-wip-us.apache.org/repos/asf/kudu/blob/7f2624ae/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java index 4a8b971..09105fc 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java @@ -223,6 +223,7 @@ public abstract class KuduRpc<R> { private void handleCallback(final Object result) { final Deferred<R> d = deferred; if (d == null) { + LOG.debug("Handling a callback on RPC {} with no deferred attached!", this); return; } deferred = null; @@ -299,6 +300,10 @@ public abstract class KuduRpc<R> { return deferred; } + boolean hasDeferred() { + return deferred != null; + } + RemoteTablet getTablet() { return this.tablet; } http://git-wip-us.apache.org/repos/asf/kudu/blob/7f2624ae/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java index 938af00..4d44406 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java @@ -152,6 +152,7 @@ public class TabletClient extends SimpleChannelUpstreamHandler { } <R> void sendRpc(KuduRpc<R> rpc) { + Preconditions.checkArgument(rpc.hasDeferred()); rpc.addTrace( new RpcTraceFrame.RpcTraceFrameBuilder( rpc.method(), http://git-wip-us.apache.org/repos/asf/kudu/blob/7f2624ae/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java index b7f9db3..d84f972 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java @@ -150,8 +150,9 @@ public class TestAsyncKuduSession extends BaseKuduTest { // Wait until tablet is deleted on TS. while (true) { ListTabletsRequest req = new ListTabletsRequest(); + Deferred<ListTabletsResponse> d = req.getDeferred(); tc.sendRpc(req); - ListTabletsResponse resp = req.getDeferred().join(); + ListTabletsResponse resp = d.join(); if (!resp.getTabletsList().contains(tabletId)) { break; }
