Repository: kafka
Updated Branches:
  refs/heads/trunk 71417552c -> e6164ec61


MINOR: Fix transient failure in SocketServerTest.testConnectionIdReuse

Two requests sent together may not always trigger a staged receive since the 
requests may not be received in a single poll and the channel is muted when 
receives are complete. Hence attempt to stage multiple times until a receive is 
staged to make the test more stable.

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Apurva Mehta <apu...@confluent.io>

Closes #3712 from rajinisivaram/MINOR-connectionidreuse-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e6164ec6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e6164ec6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e6164ec6

Branch: refs/heads/trunk
Commit: e6164ec611ca48e71c549f88d8a92fef90bf5a71
Parents: 7141755
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Thu Aug 24 22:12:50 2017 -0700
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Thu Aug 24 22:12:50 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/network/Selector.java   |  2 +-
 .../main/scala/kafka/network/SocketServer.scala |  4 ++
 .../unit/kafka/network/SocketServerTest.scala   | 44 +++++++++++++-------
 3 files changed, 35 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e6164ec6/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 3e976f3..e873252 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -802,7 +802,7 @@ public class Selector implements Selectable, AutoCloseable {
     }
 
     // only for testing
-    int numStagedReceives(KafkaChannel channel) {
+    public int numStagedReceives(KafkaChannel channel) {
         Deque<NetworkReceive> deque = stagedReceives.get(channel);
         return deque == null ? 0 : deque.size();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6164ec6/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 842c6a0..e6f6662 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -639,6 +639,10 @@ private[kafka] class Processor(val id: Int,
   private[network] def channel(connectionId: String): Option[KafkaChannel] =
     Option(selector.channel(connectionId))
 
+  // Visible for testing
+  private[network] def numStagedReceives(connectionId: String): Int =
+    openOrClosingChannel(connectionId).map(c => 
selector.numStagedReceives(c)).getOrElse(0)
+
   /**
    * Wakeup the thread for selection.
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6164ec6/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index edb814d..e59ce58 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -67,7 +67,7 @@ class SocketServerTest extends JUnitSuite {
   server.startup()
   val sockets = new ArrayBuffer[Socket]
 
-  def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = 
None) {
+  def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = 
None, flush: Boolean = true) {
     val outgoing = new DataOutputStream(socket.getOutputStream)
     id match {
       case Some(id) =>
@@ -77,7 +77,8 @@ class SocketServerTest extends JUnitSuite {
         outgoing.writeInt(request.length)
     }
     outgoing.write(request)
-    outgoing.flush()
+    if (flush)
+      outgoing.flush()
   }
 
   def receiveResponse(socket: Socket): Array[Byte] = {
@@ -233,13 +234,11 @@ class SocketServerTest extends JUnitSuite {
 
       // Connection with staged receives
       val socket2 = connect(overrideServer, protocol = 
SecurityProtocol.PLAINTEXT)
-      sendRequest(socket2, serializedBytes)
-      sendRequest(socket2, serializedBytes)
-      val request2 = overrideServer.requestChannel.receiveRequest(2000)
+      val request2 = sendRequestsUntilStagedReceive(overrideServer, socket2, 
serializedBytes)
 
       time.sleep(idleTimeMs + 1)
       TestUtils.waitUntilTrue(() => openChannel(request2).isEmpty, "Failed to 
close idle channel")
-      assertTrue("Channel removed without processing staging receives", 
openOrClosingChannel(request2).nonEmpty)
+      TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).nonEmpty, 
"Channel removed without processing staged receives")
       processRequest(overrideServer.requestChannel, request2) // this triggers 
a failed send since channel has been closed
       TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).isEmpty, 
"Failed to remove channel with failed sends")
       assertNull("Received request after failed send", 
overrideServer.requestChannel.receiveRequest(200))
@@ -274,7 +273,7 @@ class SocketServerTest extends JUnitSuite {
     try {
       overrideServer.startup()
       val socket1 = connect(overrideServer)
-      TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to create 
channel")
+      TestUtils.waitUntilTrue(() => connectionCount == 1 && 
openChannel.isDefined, "Failed to create channel")
       val channel1 = openChannel.getOrElse(throw new RuntimeException("Channel 
not found"))
 
       // Create new connection with same id when `channel1` is still open and 
in Selector.channels
@@ -283,14 +282,13 @@ class SocketServerTest extends JUnitSuite {
       TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to close 
channel")
       assertSame(channel1, openChannel.getOrElse(throw new 
RuntimeException("Channel not found")))
 
-      // Send a request to `channel1` and advance time beyond idle time so 
that `channel1` is
+      // Send requests to `channel1` until a receive is staged and advance 
time beyond idle time so that `channel1` is
       // closed with staged receives and is in Selector.closingChannels
       val serializedBytes = producerRequestBytes
-      (1 to 3).foreach(_ => sendRequest(socket1, serializedBytes))
-      val request = overrideServer.requestChannel.receiveRequest(2000)
+      val request = sendRequestsUntilStagedReceive(overrideServer, socket1, 
serializedBytes)
       time.sleep(idleTimeMs + 1)
       TestUtils.waitUntilTrue(() => openChannel.isEmpty, "Idle channel not 
closed")
-      assertTrue("Channel removed without processing staging receives", 
openOrClosingChannel.nonEmpty)
+      TestUtils.waitUntilTrue(() => openOrClosingChannel.isDefined, "Channel 
removed without processing staged receives")
 
       // Create new connection with same id when when `channel1` is in 
Selector.closingChannels
       // Check that new connection is closed and openOrClosingChannel still 
contains `channel1`
@@ -300,12 +298,11 @@ class SocketServerTest extends JUnitSuite {
 
       // Complete request with failed send so that `channel1` is removed from 
Selector.closingChannels
       processRequest(overrideServer.requestChannel, request)
-      TestUtils.waitUntilTrue(() => connectionCount == 0, "Failed to remove 
channel with failed send")
-      assertTrue("Channel not removed", openOrClosingChannel.isEmpty)
+      TestUtils.waitUntilTrue(() => connectionCount == 0 && 
openOrClosingChannel.isEmpty, "Failed to remove channel with failed send")
 
       // Check that new connections can be created with the same id since 
`channel1` is no longer in Selector
       connect(overrideServer)
-      TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to open new 
channel")
+      TestUtils.waitUntilTrue(() => connectionCount == 1 && 
openChannel.isDefined, "Failed to open new channel")
       val newChannel = openChannel.getOrElse(throw new 
RuntimeException("Channel not found"))
       assertNotSame(channel1, newChannel)
       newChannel.disconnect()
@@ -316,6 +313,25 @@ class SocketServerTest extends JUnitSuite {
     }
   }
 
+  private def sendRequestsUntilStagedReceive(server: SocketServer, socket: 
Socket, requestBytes: Array[Byte]): RequestChannel.Request = {
+    def sendTwoRequestsReceiveOne(): RequestChannel.Request = {
+      sendRequest(socket, requestBytes, flush = false)
+      sendRequest(socket, requestBytes, flush = true)
+      server.requestChannel.receiveRequest(2000)
+    }
+    val (request, hasStagedReceives) = 
TestUtils.computeUntilTrue(sendTwoRequestsReceiveOne()) { req =>
+      val connectionId = req.connectionId
+      val hasStagedReceives = 
server.processor(0).numStagedReceives(connectionId) > 0
+      if (!hasStagedReceives) {
+        processRequest(server.requestChannel, req)
+        processRequest(server.requestChannel)
+      }
+      hasStagedReceives
+    }
+    assertTrue(s"Receives not staged for 
${org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS} ms", hasStagedReceives)
+    request
+  }
+
   @Test
   def testSocketsCloseOnShutdown() {
     // open a connection

Reply via email to