Repository: kafka Updated Branches: refs/heads/trunk d9b784e14 -> 5781feb52
KAFKA-3182; Fix testSocketsCloseOnShutdown transient failures * Turned off Nagle on the sending sockets to force the socket to physically acknowledge after the first write in `sendRequest` * Added a `200ms` delay between write attempts (necessary on Linux, but not Mac) Author: Armin Braun <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #2632 from original-brownbear/KAFKA-3182 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5781feb5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5781feb5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5781feb5 Branch: refs/heads/trunk Commit: 5781feb527766fbb3620cb601ed453dd8086e0c2 Parents: d9b784e Author: Armin Braun <[email protected]> Authored: Sat Mar 4 15:22:06 2017 +0000 Committer: Ismael Juma <[email protected]> Committed: Sat Mar 4 15:28:35 2017 +0000 ---------------------------------------------------------------------- core/src/test/scala/unit/kafka/network/SocketServerTest.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5781feb5/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 3875604..17056c9 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -181,13 +181,16 @@ class SocketServerTest extends JUnitSuite { def testSocketsCloseOnShutdown() { // open a connection val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) + plainSocket.setTcpNoDelay(true) val traceSocket = connect(protocol = SecurityProtocol.TRACE) + traceSocket.setTcpNoDelay(true) val bytes = new Array[Byte](40) // send a request first to make sure the connection has been picked up by the socket server sendRequest(plainSocket, bytes, Some(0)) sendRequest(traceSocket, bytes, Some(0)) processRequest(server.requestChannel) - + // the following sleep is necessary to reliably detect the connection close when we send data below + Thread.sleep(200L) // make sure the sockets are open server.acceptors.values.map(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed)) // then shutdown the server
