Repository: kafka Updated Branches: refs/heads/trunk d0ce0a95d -> cf94b188f
MINOR: Use correct connectionId in SocketServer log message Also add connection id to KafkaChannel exception message Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <ism...@juma.me.uk> Closes #3529 from rajinisivaram/MINOR-log-connection-id Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cf94b188 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cf94b188 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cf94b188 Branch: refs/heads/trunk Commit: cf94b188f1678276a4a6df6b87d89f2d4d2e922c Parents: d0ce0a9 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Mon Jul 17 09:40:55 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Mon Jul 17 09:41:29 2017 +0100 ---------------------------------------------------------------------- .../java/org/apache/kafka/common/network/KafkaChannel.java | 2 +- core/src/main/scala/kafka/network/SocketServer.scala | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cf94b188/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 5e3a895..b563c4a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -152,7 +152,7 @@ public class KafkaChannel { public void setSend(Send send) { if (this.send != null) - throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); + throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); } http://git-wip-us.apache.org/repos/asf/kafka/blob/cf94b188/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 9088eb5..2ba5553 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -482,16 +482,17 @@ private[kafka] class Processor(val id: Int, /* `protected` for test usage */ protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) { - trace(s"Socket server received response to send, registering for write and sending data: $response") - val channel = selector.channel(responseSend.destination) + val connectionId = response.request.connectionId + trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response") + val channel = selector.channel(connectionId) // `channel` can be null if the selector closed the connection because it was idle for too long if (channel == null) { - warn(s"Attempting to send response via channel for which there is no open connection, connection id $id") + warn(s"Attempting to send response via channel for which there is no open connection, connection id $connectionId") response.request.updateRequestMetrics(0L) } else { selector.send(responseSend) - inflightResponses += (response.request.connectionId -> response) + inflightResponses += (connectionId -> response) } }