In the case of extended RPC message handling, write warning to log
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7fccf7e1 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7fccf7e1 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7fccf7e1 Branch: refs/heads/master Commit: 7fccf7e1fd9f108edbd5bf177339173d900b89a3 Parents: 3c87997 Author: Jacques Nadeau <[email protected]> Authored: Thu May 14 12:19:46 2015 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu May 14 21:58:53 2015 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/drill/exec/rpc/RpcBus.java | 11 +++++++++++ 1 file changed, 11 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/7fccf7e1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java index 92ce312..1a23724 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java @@ -31,12 +31,14 @@ import java.io.Closeable; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode; import org.apache.drill.exec.proto.UserBitShared.DrillPBError; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.protobuf.Internal.EnumLite; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; @@ -203,8 +205,10 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp logger.debug("Received message {}", msg); } final Channel channel = connection.getChannel(); + final Stopwatch watch = new Stopwatch().start(); try{ + switch (msg.mode) { case REQUEST: { // handle message and ack. @@ -270,6 +274,13 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp throw new UnsupportedOperationException(); } } finally { + long time = watch.elapsed(TimeUnit.MILLISECONDS); + long delayThreshold = Integer.parseInt(System.getProperty("drill.exec.rpcDelayWarning", "500")); + if (time > delayThreshold) { + logger.warn(String.format( + "Message of mode %s of rpc type %d took longer than %dms. Actual duration was %dms.", + msg.mode, msg.rpcType, delayThreshold, time)); + } msg.release(); } }
