Add rpc delay warning.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3698377d Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3698377d Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3698377d Branch: refs/heads/merge_2015_05_14 Commit: 3698377d8d90ae8906834e99aee9babccbed9c55 Parents: f0d0526 Author: Jacques Nadeau <[email protected]> Authored: Thu May 14 12:19:46 2015 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu May 14 12:19:46 2015 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/drill/exec/rpc/RpcBus.java | 11 +++++++++++ 1 file changed, 11 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/3698377d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java index 92ce312..0271c12 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java @@ -31,12 +31,14 @@ import java.io.Closeable; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode; import org.apache.drill.exec.proto.UserBitShared.DrillPBError; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.protobuf.Internal.EnumLite; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; @@ -203,8 +205,10 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp logger.debug("Received message {}", msg); } final Channel channel = connection.getChannel(); + final Stopwatch watch = new Stopwatch().start(); try{ + switch (msg.mode) { case REQUEST: { // handle message and ack. @@ -270,6 +274,13 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp throw new UnsupportedOperationException(); } } finally { + long time = watch.elapsed(TimeUnit.MILLISECONDS); + long delayThreshold = Integer.parseInt(System.getProperty("drill.exec.rpcDelayWarning", "50")); + if (time > delayThreshold) { + logger.warn(String.format( + "Message of mode %d of rpc type %d took longer than %dms. Actual duration was %dms.", + msg.mode, msg.rpcType, delayThreshold, time)); + } msg.release(); } }
