Repository: kafka Updated Branches: refs/heads/trunk 71417552c -> e6164ec61
MINOR: Fix transient failure in SocketServerTest.testConnectionIdReuse Two requests sent together may not always trigger a staged receive since the requests may not be received in a single poll and the channel is muted when receives are complete. Hence attempt to stage multiple times until a receive is staged to make the test more stable. Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma <ism...@juma.me.uk>, Apurva Mehta <apu...@confluent.io> Closes #3712 from rajinisivaram/MINOR-connectionidreuse-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e6164ec6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e6164ec6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e6164ec6 Branch: refs/heads/trunk Commit: e6164ec611ca48e71c549f88d8a92fef90bf5a71 Parents: 7141755 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Thu Aug 24 22:12:50 2017 -0700 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Thu Aug 24 22:12:50 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/common/network/Selector.java | 2 +- .../main/scala/kafka/network/SocketServer.scala | 4 ++ .../unit/kafka/network/SocketServerTest.scala | 44 +++++++++++++------- 3 files changed, 35 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e6164ec6/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 3e976f3..e873252 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -802,7 +802,7 @@ public class Selector implements Selectable, AutoCloseable { } // only for testing - int numStagedReceives(KafkaChannel channel) { + public int numStagedReceives(KafkaChannel channel) { Deque<NetworkReceive> deque = stagedReceives.get(channel); return deque == null ? 0 : deque.size(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e6164ec6/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 842c6a0..e6f6662 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -639,6 +639,10 @@ private[kafka] class Processor(val id: Int, private[network] def channel(connectionId: String): Option[KafkaChannel] = Option(selector.channel(connectionId)) + // Visible for testing + private[network] def numStagedReceives(connectionId: String): Int = + openOrClosingChannel(connectionId).map(c => selector.numStagedReceives(c)).getOrElse(0) + /** * Wakeup the thread for selection. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/e6164ec6/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index edb814d..e59ce58 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -67,7 +67,7 @@ class SocketServerTest extends JUnitSuite { server.startup() val sockets = new ArrayBuffer[Socket] - def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None) { + def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None, flush: Boolean = true) { val outgoing = new DataOutputStream(socket.getOutputStream) id match { case Some(id) => @@ -77,7 +77,8 @@ class SocketServerTest extends JUnitSuite { outgoing.writeInt(request.length) } outgoing.write(request) - outgoing.flush() + if (flush) + outgoing.flush() } def receiveResponse(socket: Socket): Array[Byte] = { @@ -233,13 +234,11 @@ class SocketServerTest extends JUnitSuite { // Connection with staged receives val socket2 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT) - sendRequest(socket2, serializedBytes) - sendRequest(socket2, serializedBytes) - val request2 = overrideServer.requestChannel.receiveRequest(2000) + val request2 = sendRequestsUntilStagedReceive(overrideServer, socket2, serializedBytes) time.sleep(idleTimeMs + 1) TestUtils.waitUntilTrue(() => openChannel(request2).isEmpty, "Failed to close idle channel") - assertTrue("Channel removed without processing staging receives", openOrClosingChannel(request2).nonEmpty) + TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).nonEmpty, "Channel removed without processing staged receives") processRequest(overrideServer.requestChannel, request2) // this triggers a failed send since channel has been closed TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).isEmpty, "Failed to remove channel with failed sends") assertNull("Received request after failed send", overrideServer.requestChannel.receiveRequest(200)) @@ -274,7 +273,7 @@ class SocketServerTest extends JUnitSuite { try { overrideServer.startup() val socket1 = connect(overrideServer) - TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to create channel") + TestUtils.waitUntilTrue(() => connectionCount == 1 && openChannel.isDefined, "Failed to create channel") val channel1 = openChannel.getOrElse(throw new RuntimeException("Channel not found")) // Create new connection with same id when `channel1` is still open and in Selector.channels @@ -283,14 +282,13 @@ class SocketServerTest extends JUnitSuite { TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to close channel") assertSame(channel1, openChannel.getOrElse(throw new RuntimeException("Channel not found"))) - // Send a request to `channel1` and advance time beyond idle time so that `channel1` is + // Send requests to `channel1` until a receive is staged and advance time beyond idle time so that `channel1` is // closed with staged receives and is in Selector.closingChannels val serializedBytes = producerRequestBytes - (1 to 3).foreach(_ => sendRequest(socket1, serializedBytes)) - val request = overrideServer.requestChannel.receiveRequest(2000) + val request = sendRequestsUntilStagedReceive(overrideServer, socket1, serializedBytes) time.sleep(idleTimeMs + 1) TestUtils.waitUntilTrue(() => openChannel.isEmpty, "Idle channel not closed") - assertTrue("Channel removed without processing staging receives", openOrClosingChannel.nonEmpty) + TestUtils.waitUntilTrue(() => openOrClosingChannel.isDefined, "Channel removed without processing staged receives") // Create new connection with same id when when `channel1` is in Selector.closingChannels // Check that new connection is closed and openOrClosingChannel still contains `channel1` @@ -300,12 +298,11 @@ class SocketServerTest extends JUnitSuite { // Complete request with failed send so that `channel1` is removed from Selector.closingChannels processRequest(overrideServer.requestChannel, request) - TestUtils.waitUntilTrue(() => connectionCount == 0, "Failed to remove channel with failed send") - assertTrue("Channel not removed", openOrClosingChannel.isEmpty) + TestUtils.waitUntilTrue(() => connectionCount == 0 && openOrClosingChannel.isEmpty, "Failed to remove channel with failed send") // Check that new connections can be created with the same id since `channel1` is no longer in Selector connect(overrideServer) - TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to open new channel") + TestUtils.waitUntilTrue(() => connectionCount == 1 && openChannel.isDefined, "Failed to open new channel") val newChannel = openChannel.getOrElse(throw new RuntimeException("Channel not found")) assertNotSame(channel1, newChannel) newChannel.disconnect() @@ -316,6 +313,25 @@ class SocketServerTest extends JUnitSuite { } } + private def sendRequestsUntilStagedReceive(server: SocketServer, socket: Socket, requestBytes: Array[Byte]): RequestChannel.Request = { + def sendTwoRequestsReceiveOne(): RequestChannel.Request = { + sendRequest(socket, requestBytes, flush = false) + sendRequest(socket, requestBytes, flush = true) + server.requestChannel.receiveRequest(2000) + } + val (request, hasStagedReceives) = TestUtils.computeUntilTrue(sendTwoRequestsReceiveOne()) { req => + val connectionId = req.connectionId + val hasStagedReceives = server.processor(0).numStagedReceives(connectionId) > 0 + if (!hasStagedReceives) { + processRequest(server.requestChannel, req) + processRequest(server.requestChannel) + } + hasStagedReceives + } + assertTrue(s"Receives not staged for ${org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS} ms", hasStagedReceives) + request + } + @Test def testSocketsCloseOnShutdown() { // open a connection