Repository: kudu
Updated Branches:
refs/heads/master ce80f4ebd -> c53d9694f
[java client] Implement RPC tracing, part 1
First part of this work is adding the tracing objects and doing the tracing. A
second
patch will make this information available to users.
This patch is using a pretty simple method of just
shoving container objects into a list, per RPC. The traces are lightweight
and don't try anything fancy. We also introduce the concept of "parent RPC", so
that say
a Write RPC spawns a GetTableLocations, and the latter will be added to the
former
so that the call to the master adds traces to both RPCs.
This patch isn't adding a nice way to present the traces (like JSON) but here's
a simple
toString example:
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973547,
action=SEND_TO_SERVER, server=3926a6a73e994152be1336beb434154e},
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973548,
action=RECEIVE_FROM_SERVER, server=3926a6a73e994152be1336beb434154e,
callStatus=Network error: [Peer 3926a6a73e994152be1336beb434154e] Connection
reset on [id: 0xc83743df]}
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973548,
action=SLEEP_THEN_RETRY, callStatus=Network error: [Peer
3926a6a73e994152be1336beb434154e] Connection reset on [id: 0xc83743df]},
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973574,
action=QUERY_MASTER},
RpcTraceFrame{rpcMethod='GetTableLocations', timestampMs=1477079973574,
action=SEND_TO_SERVER, server=c0d4588690d241c69821ee773eebd185},
RpcTraceFrame{rpcMethod='GetTableLocations', timestampMs=1477079973576,
action=RECEIVE_FROM_SERVER, server=c0d4588690d241c69821ee773eebd185,
callStatus=OK},
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973579,
action=PICKED_REPLICA},
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973579,
action=SEND_TO_SERVER, server=0353a6d97d6c49f9a727bc1ee6c3393e},
This patch also fixes up some paths where we weren't passing a timeout
correctly to an
RPC that was created in relation to another RPC (basically paths where the
parent RPC
had to be set).
Change-Id: I69ef56acc071b9f80b34e38c1821df4096f54907
Reviewed-on: http://gerrit.cloudera.org:8080/4781
Reviewed-by: Dan Burkert <[email protected]>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bdbee44e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bdbee44e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bdbee44e
Branch: refs/heads/master
Commit: bdbee44e029b2ada382c09ccb63a522b6de66186
Parents: ce80f4e
Author: Jean-Daniel Cryans <[email protected]>
Authored: Fri Oct 21 13:10:31 2016 -0700
Committer: Jean-Daniel Cryans <[email protected]>
Committed: Fri Nov 4 22:09:52 2016 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/AsyncKuduClient.java | 87 +++++++++-----
.../apache/kudu/client/AsyncKuduScanner.java | 2 +-
.../main/java/org/apache/kudu/client/Batch.java | 1 +
.../java/org/apache/kudu/client/KuduRpc.java | 57 +++++++++
.../org/apache/kudu/client/RpcTraceFrame.java | 117 +++++++++++++++++++
.../org/apache/kudu/client/TabletClient.java | 40 ++++++-
.../org/apache/kudu/client/BaseKuduTest.java | 3 +-
.../org/apache/kudu/client/TestRpcTraces.java | 115 ++++++++++++++++++
8 files changed, 387 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 06b9203..9274087 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -587,8 +587,8 @@ public class AsyncKuduClient implements AutoCloseable {
RemoteTablet tablet = scanner.currentTablet();
assert (tablet != null);
KuduRpc<AsyncKuduScanner.Response> nextRequest =
scanner.getNextRowsRequest();
- TabletClient client =
-
connectionCache.getClient(tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection()));
+ String uuid =
tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection());
+ TabletClient client = connectionCache.getClient(uuid);
Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
// Important to increment the attempts before the next if statement since
// getSleepTimeForRpc() relies on it if the client is null or dead.
@@ -597,7 +597,9 @@ public class AsyncKuduClient implements AutoCloseable {
// A null client means we either don't know about this tablet anymore
(unlikely) or we
// couldn't find a leader (which could be triggered by a read timeout).
// We'll first delay the RPC in case things take some time to settle
down, then retry.
- return delayedSendRpcToTablet(nextRequest, null);
+ Status statusRemoteError = Status.RemoteError("Not connected to server "
+ uuid
+ + " will retry after a delay");
+ return delayedSendRpcToTablet(nextRequest, new
RecoverableException(statusRemoteError));
}
client.sendRpc(nextRequest);
return d;
@@ -685,6 +687,12 @@ public class AsyncKuduClient implements AutoCloseable {
}
}
+ request.addTrace(
+ new RpcTraceFrame.RpcTraceFrameBuilder(
+ request.method(),
+ RpcTraceFrame.Action.QUERY_MASTER)
+ .build());
+
// We fall through to here in two cases:
//
// 1) This client has not yet discovered the tablet which is responsible
for
@@ -702,7 +710,7 @@ public class AsyncKuduClient implements AutoCloseable {
Callback<Deferred<R>, Master.GetTableLocationsResponsePB> cb = new
RetryRpcCB<>(request);
Callback<Deferred<R>, Exception> eb = new RetryRpcErrback<>(request);
Deferred<Master.GetTableLocationsResponsePB> returnedD =
- locateTablet(request.getTable(), partitionKey);
+ locateTablet(request.getTable(), partitionKey, request);
return AsyncUtil.addCallbacksDeferring(returnedD, cb, eb);
}
@@ -712,7 +720,7 @@ public class AsyncKuduClient implements AutoCloseable {
* <p>
* Use {@code AsyncUtil.addCallbacksDeferring} to add this as the callback
and
* {@link AsyncKuduClient.RetryRpcErrback} as the "errback" to the {@code
Deferred}
- * returned by {@link #locateTablet(KuduTable, byte[])}.
+ * returned by {@link #locateTablet(KuduTable, byte[], KuduRpc)}.
* @param <R> RPC's return type.
* @param <D> Previous query's return type, which we don't use, but need to
specify in order to
* tie it all together.
@@ -739,7 +747,7 @@ public class AsyncKuduClient implements AutoCloseable {
* <p>
* Use {@code AsyncUtil.addCallbacksDeferring} to add this as the "errback"
and
* {@link RetryRpcCB} as the callback to the {@code Deferred} returned by
- * {@link #locateTablet(KuduTable, byte[])}.
+ * {@link #locateTablet(KuduTable, byte[], KuduRpc)}.
* @see #delayedSendRpcToTablet(KuduRpc, KuduException)
* @param <R> The type of the original RPC.
*/
@@ -818,10 +826,12 @@ public class AsyncKuduClient implements AutoCloseable {
}
}
}
- IsCreateTableDoneRequest rpc = new
IsCreateTableDoneRequest(masterTable, tableId);
- rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+ IsCreateTableDoneRequest isCreateTableDoneRequest =
+ new IsCreateTableDoneRequest(masterTable, tableId);
+
isCreateTableDoneRequest.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+ isCreateTableDoneRequest.setParentRpc(rpc);
final Deferred<Master.IsCreateTableDoneResponsePB> d =
- sendRpcToTablet(rpc).addCallback(new IsCreateTableDoneCB(tableId));
+ sendRpcToTablet(isCreateTableDoneRequest).addCallback(new
IsCreateTableDoneCB(tableId));
if (has_permit) {
// The errback is needed here to release the lookup permit
d.addCallbacks(new
ReleaseMasterLookupPermit<Master.IsCreateTableDoneResponsePB>(),
@@ -958,13 +968,15 @@ public class AsyncKuduClient implements AutoCloseable {
* Sends a getTableLocations RPC to the master to find the table's tablets.
* @param table table to lookup
* @param partitionKey can be null, if not we'll find the exact tablet that
contains it
+ * @param parentRpc RPC that prompted a master lookup, can be null
* @return Deferred to track the progress
*/
private Deferred<Master.GetTableLocationsResponsePB> locateTablet(KuduTable
table,
- byte[]
partitionKey) {
- final boolean has_permit = acquireMasterLookupPermit();
+ byte[]
partitionKey,
+ KuduRpc<?>
parentRpc) {
+ boolean hasPermit = acquireMasterLookupPermit();
String tableId = table.getTableId();
- if (!has_permit) {
+ if (!hasPermit) {
// If we failed to acquire a permit, it's worth checking if someone
// looked up the tablet we're interested in. Every once in a while
// this will save us a Master lookup.
@@ -974,22 +986,27 @@ public class AsyncKuduClient implements AutoCloseable {
return Deferred.fromResult(null); // Looks like no lookup needed.
}
}
- // Leave the end of the partition key range empty in order to pre-fetch
tablet locations.
- GetTableLocationsRequest rpc =
- new GetTableLocationsRequest(masterTable, partitionKey, null, tableId);
- rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
- final Deferred<Master.GetTableLocationsResponsePB> d;
// If we know this is going to the master, check the master consensus
// configuration (as specified by 'masterAddresses' field) to determine and
// cache the current leader.
+ Deferred<Master.GetTableLocationsResponsePB> d;
if (isMasterTable(tableId)) {
- d = getMasterTableLocationsPB();
+ d = getMasterTableLocationsPB(parentRpc);
} else {
+ // Leave the end of the partition key range empty in order to pre-fetch
tablet locations.
+ GetTableLocationsRequest rpc =
+ new GetTableLocationsRequest(masterTable, partitionKey, null,
tableId);
+ if (parentRpc != null) {
+
rpc.setTimeoutMillis(parentRpc.deadlineTracker.getMillisBeforeDeadline());
+ rpc.setParentRpc(parentRpc);
+ } else {
+ rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+ }
d = sendRpcToTablet(rpc);
}
d.addCallback(new MasterLookupCB(table, partitionKey));
- if (has_permit) {
+ if (hasPermit) {
d.addBoth(new
ReleaseMasterLookupPermit<Master.GetTableLocationsResponsePB>());
}
return d;
@@ -1000,7 +1017,7 @@ public class AsyncKuduClient implements AutoCloseable {
* fill a {@link Master.GetTabletLocationsResponsePB} object.
* @return An initialized Deferred object to hold the response.
*/
- Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB() {
+ Deferred<Master.GetTableLocationsResponsePB>
getMasterTableLocationsPB(KuduRpc<?> parentRpc) {
final Deferred<Master.GetTableLocationsResponsePB> responseD = new
Deferred<>();
final GetMasterRegistrationReceived received =
new GetMasterRegistrationReceived(masterAddresses, responseD);
@@ -1017,7 +1034,7 @@ public class AsyncKuduClient implements AutoCloseable {
Status statusIOE = Status.IOError(message);
d = Deferred.fromError(new NonRecoverableException(statusIOE));
} else {
- d = getMasterRegistration(clientForHostAndPort);
+ d = getMasterRegistration(clientForHostAndPort, parentRpc);
}
d.addCallbacks(received.callbackForNode(hostAndPort),
received.errbackForNode(hostAndPort));
}
@@ -1088,7 +1105,7 @@ public class AsyncKuduClient implements AutoCloseable {
// When lookup completes, the tablet (or non-covered range) for the next
// partition key will be located and added to the client's cache.
final byte[] lookupKey = partitionKey;
- return locateTablet(table, key).addCallbackDeferring(
+ return locateTablet(table, key, null).addCallbackDeferring(
new Callback<Deferred<List<LocatedTablet>>,
GetTableLocationsResponsePB>() {
@Override
public Deferred<List<LocatedTablet>>
call(GetTableLocationsResponsePB resp) {
@@ -1156,7 +1173,7 @@ public class AsyncKuduClient implements AutoCloseable {
* {@link #getSleepTimeForRpc(KuduRpc)}. If the RPC is out of time/retries,
its errback will
* be immediately called.
* @param rpc the RPC to retry later
- * @param ex the reason why we need to retry, might be null
+ * @param ex the reason why we need to retry
* @return a Deferred object to use if this method is called inline with the
user's original
* attempt to send the RPC. Can be ignored in any other context that doesn't
need to return a
* Deferred back to the user.
@@ -1171,6 +1188,15 @@ public class AsyncKuduClient implements AutoCloseable {
sendRpcToTablet(rpc);
}
}
+ assert (ex != null);
+ Status reasonForRetry = ex.getStatus();
+ rpc.addTrace(
+ new RpcTraceFrame.RpcTraceFrameBuilder(
+ rpc.method(),
+ RpcTraceFrame.Action.SLEEP_THEN_RETRY)
+ .callStatus(reasonForRetry)
+ .build());
+
long sleepTime = getSleepTimeForRpc(rpc);
if (cannotRetryRequest(rpc) ||
rpc.deadlineTracker.wouldSleepingTimeout(sleepTime)) {
// Don't let it retry.
@@ -1389,14 +1415,21 @@ public class AsyncKuduClient implements AutoCloseable {
/**
* Retrieve the master registration (see {@link
GetMasterRegistrationResponse}
* for a replica.
- * @param masterClient An initialized client for the master replica.
- * @return A Deferred object for the master replica's current registration.
+ * @param masterClient an initialized client for the master replica
+ * @param parentRpc RPC that prompted a master lookup, can be null
+ * @return a Deferred object for the master replica's current registration
*/
- Deferred<GetMasterRegistrationResponse> getMasterRegistration(TabletClient
masterClient) {
+ Deferred<GetMasterRegistrationResponse> getMasterRegistration(
+ TabletClient masterClient, KuduRpc<?> parentRpc) {
// TODO: Handle the situation when multiple in-flight RPCs all want to
query the masters,
// basically reuse in some way the master permits.
GetMasterRegistrationRequest rpc = new
GetMasterRegistrationRequest(masterTable);
- rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+ if (parentRpc != null) {
+
rpc.setTimeoutMillis(parentRpc.deadlineTracker.getMillisBeforeDeadline());
+ rpc.setParentRpc(parentRpc);
+ } else {
+ rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+ }
Deferred<GetMasterRegistrationResponse> d = rpc.getDeferred();
rpc.attempt++;
masterClient.sendRpc(rpc);
http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 3864088..8171ac1 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -789,7 +789,7 @@ public final class AsyncKuduScanner {
public String toString() {
return "ScanRequest(scannerId=" + Bytes.pretty(scannerId)
+ (tablet != null? ", tabletSlice=" + tablet.getTabletId() : "")
- + ", attempt=" + attempt + ')';
+ + ", attempt=" + attempt + ", " + super.toString() + ")";
}
@Override
http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
index 8dacdd7..0f86a42 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
@@ -178,6 +178,7 @@ class Batch extends KuduRpc<BatchResponse> {
.add("operations", operations.size())
.add("tablet", tablet)
.add("ignoreAllDuplicateRows", ignoreAllDuplicateRows)
+ .add("rpc", super.toString())
.toString();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 5ae3f56..03fcce1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -25,6 +25,7 @@
*/
package org.apache.kudu.client;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -39,7 +40,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
@@ -61,12 +65,20 @@ import static
org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
@InterfaceAudience.Private
public abstract class KuduRpc<R> {
+ @VisibleForTesting
+ public static final int MAX_TRACES_SIZE = 100;
+
// Service names.
protected static final String MASTER_SERVICE_NAME =
"kudu.master.MasterService";
protected static final String TABLET_SERVER_SERVICE_NAME =
"kudu.tserver.TabletServerService";
private static final Logger LOG = LoggerFactory.getLogger(KuduRpc.class);
+ private final List<RpcTraceFrame> traces =
+ Collections.synchronizedList(new ArrayList<RpcTraceFrame>());
+
+ private KuduRpc<?> parentRpc;
+
/**
* Returns the partition key this RPC is for, or {@code null} if the RPC is
* not tablet specific.
@@ -203,10 +215,46 @@ public abstract class KuduRpc<R> {
sequenceId = RequestTracker.NO_SEQ_NO;
}
deadlineTracker.reset();
+ traces.clear();
+ parentRpc = null;
d.callback(result);
}
/**
+ * Add the provided trace to this RPC's collection of traces. If this RPC
has a parent RPC, it
+ * will also receive that trace. If this RPC has reached the limit of traces
it can track then
+ * the trace will just be discarded.
+ * @param rpcTraceFrame trace to add
+ */
+ void addTrace(RpcTraceFrame rpcTraceFrame) {
+ if (parentRpc != null) {
+ parentRpc.addTrace(rpcTraceFrame);
+ }
+
+ if (traces.size() == MAX_TRACES_SIZE) {
+ // Add a last trace that indicates that we've reached the max size.
+ traces.add(
+ new RpcTraceFrame.RpcTraceFrameBuilder(
+ this.method(),
+ RpcTraceFrame.Action.TRACE_TRUNCATED)
+ .build());
+ } else if (traces.size() < MAX_TRACES_SIZE) {
+ traces.add(rpcTraceFrame);
+ }
+ }
+
+ /**
+ * Sets this RPC to receive traces from the provided parent RPC. An RPC can
only have one and
+ * only one parent RPC.
+ * @param parentRpc RPC that will also receive traces from this RPC
+ */
+ void setParentRpc(KuduRpc<?> parentRpc) {
+ assert (this.parentRpc == null);
+ assert (this.parentRpc != this);
+ this.parentRpc = parentRpc;
+ }
+
+ /**
* Package private way of making an RPC complete by giving it its result.
* If this RPC has no {@link Deferred} associated to it, nothing will
* happen. This may happen if the RPC was already called back.
@@ -267,6 +315,14 @@ public abstract class KuduRpc<R> {
return ReplicaSelection.LEADER_ONLY;
}
+ /**
+ * Get an immutable copy of the traces.
+ * @return list of traces
+ */
+ List<RpcTraceFrame> getImmutableTraces() {
+ return ImmutableList.copyOf(traces);
+ }
+
void setSequenceId(long sequenceId) {
assert (this.sequenceId == RequestTracker.NO_SEQ_NO);
this.sequenceId = sequenceId;
@@ -289,6 +345,7 @@ public abstract class KuduRpc<R> {
// this method if DEBUG is enabled.
if (LOG.isDebugEnabled()) {
buf.append(", ").append(deferred);
+ buf.append(", ").append(traces);
}
buf.append(')');
return buf.toString();
http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java
----------------------------------------------------------------------
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java
new file mode 100644
index 0000000..cae2b02
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import com.google.common.base.MoreObjects;
+import org.apache.kudu.annotations.InterfaceAudience;
+
+/**
+ * Container class for traces. Most of its properties can be null, when they
aren't set via the
+ * builder. The timestamp is set automatically.
+ */
[email protected]
+class RpcTraceFrame {
+ enum Action {
+ // Just before putting the RPC on the wire.
+ SEND_TO_SERVER,
+ // Just after parsing the response from the server.
+ RECEIVE_FROM_SERVER,
+ // Just before sleeping and then retrying.
+ SLEEP_THEN_RETRY,
+ // After having figured out that we don't know where the RPC is going,
+ // before querying the master.
+ QUERY_MASTER,
+ // Once the trace becomes too large, will be the last trace object in the
list.
+ TRACE_TRUNCATED
+ }
+
+ private final String rpcMethod;
+ private final Action action;
+ private final ServerInfo serverInfo;
+ private final long timestampMs;
+ private final Status callStatus;
+
+ private RpcTraceFrame(String rpcMethod, Action action,
+ ServerInfo serverInfo, Status callStatus) {
+ this.rpcMethod = rpcMethod;
+ this.action = action;
+ this.serverInfo = serverInfo;
+ this.callStatus = callStatus;
+ this.timestampMs = System.currentTimeMillis();
+ }
+
+ public String getRpcMethod() {
+ return rpcMethod;
+ }
+
+ Action getAction() {
+ return action;
+ }
+
+ ServerInfo getServer() {
+ return serverInfo;
+ }
+
+ long getTimestampMs() {
+ return timestampMs;
+ }
+
+ public Status getStatus() {
+ return callStatus;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("rpcMethod", rpcMethod)
+ .add("timestampMs", timestampMs)
+ .add("action", action)
+ .add("serverInfo", serverInfo)
+ .add("callStatus", callStatus)
+ .toString();
+ }
+
+ /**
+ * Builder class for trace frames. The only required parameters are set in
the constructor.
+ * Timestamp is set automatically.
+ */
+ static class RpcTraceFrameBuilder {
+ private final String rpcMethod;
+ private final Action action;
+ private ServerInfo serverInfo;
+ private Status callStatus;
+
+ RpcTraceFrameBuilder(String rpcMethod, Action action) {
+ this.rpcMethod = rpcMethod;
+ this.action = action;
+ }
+
+ public RpcTraceFrameBuilder serverInfo(ServerInfo serverInfo) {
+ this.serverInfo = serverInfo;
+ return this;
+ }
+
+ public RpcTraceFrameBuilder callStatus(Status callStatus) {
+ this.callStatus = callStatus;
+ return this;
+ }
+
+ public RpcTraceFrame build() {
+ return new RpcTraceFrame(rpcMethod, action, serverInfo, callStatus);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index 1b6f5ac..813f02b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -153,6 +153,13 @@ public class TabletClient extends
ReplayingDecoder<VoidEnum> {
}
<R> void sendRpc(KuduRpc<R> rpc) {
+ rpc.addTrace(
+ new RpcTraceFrame.RpcTraceFrameBuilder(
+ rpc.method(),
+ RpcTraceFrame.Action.SEND_TO_SERVER)
+ .serverInfo(serverInfo)
+ .build());
+
if (!rpc.deadlineTracker.hasDeadline()) {
LOG.warn(getPeerUuidLoggingString() + " sending an rpc without a timeout
" + rpc);
}
@@ -420,6 +427,13 @@ public class TabletClient extends
ReplayingDecoder<VoidEnum> {
throw new NonRecoverableException(statusIllegalState);
}
+ // Start building the trace, we'll finish it as we parse the response.
+ RpcTraceFrame.RpcTraceFrameBuilder traceBuilder =
+ new RpcTraceFrame.RpcTraceFrameBuilder(
+ rpc.method(),
+ RpcTraceFrame.Action.RECEIVE_FROM_SERVER)
+ .serverInfo(serverInfo);
+
Pair<Object, Object> decoded = null;
KuduException exception = null;
Status retryableHeaderError = Status.OK();
@@ -463,6 +477,7 @@ public class TabletClient extends
ReplayingDecoder<VoidEnum> {
// This check is specifically for the ERROR_SERVER_TOO_BUSY case above.
if (!retryableHeaderError.ok()) {
+ rpc.addTrace(traceBuilder.callStatus(retryableHeaderError).build());
kuduClient.handleRetryableError(rpc, new
RecoverableException(retryableHeaderError));
return null;
}
@@ -473,7 +488,7 @@ public class TabletClient extends
ReplayingDecoder<VoidEnum> {
if (decoded != null) {
if (decoded.getSecond() instanceof Tserver.TabletServerErrorPB) {
Tserver.TabletServerErrorPB error = (Tserver.TabletServerErrorPB)
decoded.getSecond();
- exception = dispatchTSErrorOrReturnException(rpc, error);
+ exception = dispatchTSErrorOrReturnException(rpc, error, traceBuilder);
if (exception == null) {
// It was taken care of.
return null;
@@ -484,7 +499,7 @@ public class TabletClient extends
ReplayingDecoder<VoidEnum> {
} else if (decoded.getSecond() instanceof Master.MasterErrorPB) {
Master.MasterErrorPB error = (Master.MasterErrorPB)
decoded.getSecond();
- exception = dispatchMasterErrorOrReturnException(rpc, error);
+ exception = dispatchMasterErrorOrReturnException(rpc, error,
traceBuilder);
if (exception == null) {
// Exception was taken care of.
return null;
@@ -500,11 +515,13 @@ public class TabletClient extends
ReplayingDecoder<VoidEnum> {
if (kuduClient.isStatisticsEnabled()) {
rpc.updateStatistics(kuduClient.getStatistics(), decoded.getFirst());
}
+ rpc.addTrace(traceBuilder.callStatus(Status.OK()).build());
rpc.callback(decoded.getFirst());
} else {
if (kuduClient.isStatisticsEnabled()) {
rpc.updateStatistics(kuduClient.getStatistics(), null);
}
+ rpc.addTrace(traceBuilder.callStatus(exception.getStatus()).build());
rpc.errback(exception);
}
} catch (Exception e) {
@@ -525,8 +542,9 @@ public class TabletClient extends
ReplayingDecoder<VoidEnum> {
* @param error the error the TS sent
* @return an exception if we couldn't dispatch the error, or null
*/
- private KuduException dispatchTSErrorOrReturnException(KuduRpc rpc,
-
Tserver.TabletServerErrorPB error) {
+ private KuduException dispatchTSErrorOrReturnException(
+ KuduRpc rpc, Tserver.TabletServerErrorPB error,
+ RpcTraceFrame.RpcTraceFrameBuilder traceBuilder) {
WireProtocol.AppStatusPB.ErrorCode code = error.getStatus().getCode();
Status status = Status.fromTabletServerErrorPB(error);
if (error.getCode() == Tserver.TabletServerErrorPB.Code.TABLET_NOT_FOUND) {
@@ -541,6 +559,7 @@ public class TabletClient extends
ReplayingDecoder<VoidEnum> {
} else {
return new NonRecoverableException(status);
}
+ rpc.addTrace(traceBuilder.callStatus(status).build());
return null;
}
@@ -551,8 +570,8 @@ public class TabletClient extends
ReplayingDecoder<VoidEnum> {
* @param error the error the master sent
* @return an exception if we couldn't dispatch the error, or null
*/
- private KuduException dispatchMasterErrorOrReturnException(KuduRpc rpc,
-
Master.MasterErrorPB error) {
+ private KuduException dispatchMasterErrorOrReturnException(
+ KuduRpc rpc, Master.MasterErrorPB error,
RpcTraceFrame.RpcTraceFrameBuilder traceBuilder) {
WireProtocol.AppStatusPB.ErrorCode code = error.getStatus().getCode();
Status status = Status.fromMasterErrorPB(error);
if (error.getCode() == Master.MasterErrorPB.Code.NOT_THE_LEADER) {
@@ -571,6 +590,7 @@ public class TabletClient extends
ReplayingDecoder<VoidEnum> {
} else {
return new NonRecoverableException(status);
}
+ rpc.addTrace(traceBuilder.callStatus(status).build());
return null;
}
@@ -735,6 +755,14 @@ public class TabletClient extends
ReplayingDecoder<VoidEnum> {
*/
private void failOrRetryRpc(final KuduRpc<?> rpc,
final RecoverableException exception) {
+ rpc.addTrace(
+ new RpcTraceFrame.RpcTraceFrameBuilder(
+ rpc.method(),
+ RpcTraceFrame.Action.RECEIVE_FROM_SERVER)
+ .serverInfo(serverInfo)
+ .callStatus(exception.getStatus())
+ .build());
+
RemoteTablet tablet = rpc.getTablet();
// Note As of the time of writing (03/11/16), a null tablet doesn't make
sense, if we see a null
// tablet it's because we didn't set it properly before calling sendRpc().
http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index 8989f50..d30762c 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -380,7 +380,8 @@ public class BaseKuduTest {
Stopwatch sw = Stopwatch.createStarted();
int leaderPort = -1;
while (leaderPort == -1 && sw.elapsed(TimeUnit.MILLISECONDS) <
DEFAULT_SLEEP) {
- Deferred<Master.GetTableLocationsResponsePB> masterLocD =
client.getMasterTableLocationsPB();
+ Deferred<Master.GetTableLocationsResponsePB> masterLocD =
+ client.getMasterTableLocationsPB(null);
Master.GetTableLocationsResponsePB r = masterLocD.join(DEFAULT_SLEEP);
leaderPort = r.getTabletLocations(0)
.getReplicas(0)
http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/test/java/org/apache/kudu/client/TestRpcTraces.java
----------------------------------------------------------------------
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRpcTraces.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRpcTraces.java
new file mode 100644
index 0000000..6e8842c
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRpcTraces.java
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class TestRpcTraces {
+
+ @Test
+ public void testLimit() {
+ PingRequest ping = PingRequest.makeMasterPingRequest();
+
+ ping.addTrace(getTrace());
+ assertNotTruncated(ping);
+
+ for (int i = 0; i < KuduRpc.MAX_TRACES_SIZE - 2; i++) {
+ ping.addTrace(getTrace());
+ }
+ assertNotTruncated(ping);
+
+ ping.addTrace(getTrace());
+ assertNotTruncated(ping);
+
+ ping.addTrace(getTrace());
+ assertTruncateIsLast(ping);
+
+ ping.addTrace(getTrace());
+ assertTruncateIsLast(ping);
+ }
+
+ @Test
+ public void testParentRpc() {
+ PingRequest parent = PingRequest.makeMasterPingRequest();
+
+ PingRequest daughter = PingRequest.makeMasterPingRequest();
+ PingRequest son = PingRequest.makeMasterPingRequest();
+
+ PingRequest sonsDaughter = PingRequest.makeMasterPingRequest();
+
+ sonsDaughter.setParentRpc(son);
+ son.setParentRpc(parent);
+ daughter.setParentRpc(parent);
+
+ // Son's daughter => son => parent.
+ RpcTraceFrame trace = getTrace();
+ sonsDaughter.addTrace(trace);
+ assertTrue(son.getImmutableTraces().get(0) == trace);
+ assertTrue(parent.getImmutableTraces().get(0) == trace);
+ assertTrue(daughter.getImmutableTraces().isEmpty());
+
+ // Son => parent.
+ trace = getTrace();
+ son.addTrace(trace);
+ assertTrue(son.getImmutableTraces().get(1) == trace);
+ assertTrue(parent.getImmutableTraces().get(1) == trace);
+ assertTrue(daughter.getImmutableTraces().isEmpty());
+ assertEquals(1, sonsDaughter.getImmutableTraces().size());
+
+ // Daughter => parent.
+ trace = getTrace();
+ daughter.addTrace(trace);
+ assertTrue(daughter.getImmutableTraces().get(0) == trace);
+ assertTrue(parent.getImmutableTraces().get(2) == trace);
+ assertEquals(2, son.getImmutableTraces().size());
+ assertEquals(1, sonsDaughter.getImmutableTraces().size());
+
+ // Parent alone.
+ trace = getTrace();
+ parent.addTrace(trace);
+ assertTrue(parent.getImmutableTraces().get(3) == trace);
+ assertEquals(1, daughter.getImmutableTraces().size());
+ assertEquals(2, son.getImmutableTraces().size());
+ assertEquals(1, sonsDaughter.getImmutableTraces().size());
+ }
+
+ private RpcTraceFrame getTrace() {
+ return new RpcTraceFrame.RpcTraceFrameBuilder(
+ "trace",
+ RpcTraceFrame.Action.QUERY_MASTER) // Just a random action.
+ .build();
+ }
+
+ private void assertNotTruncated(KuduRpc<?> rpc) {
+ for (RpcTraceFrame trace : rpc.getImmutableTraces()) {
+ assertNotEquals(RpcTraceFrame.Action.TRACE_TRUNCATED, trace.getAction());
+ }
+ }
+
+ private void assertTruncateIsLast(KuduRpc<?> rpc) {
+ List<RpcTraceFrame> traces = rpc.getImmutableTraces();
+ assertEquals(KuduRpc.MAX_TRACES_SIZE + 1, traces.size());
+ for (int i = 0; i < traces.size() - 1; i++) {
+ assertNotEquals(RpcTraceFrame.Action.TRACE_TRUNCATED,
traces.get(i).getAction());
+ }
+ assertEquals(RpcTraceFrame.Action.TRACE_TRUNCATED,
traces.get(traces.size() - 1).getAction());
+ }
+}