This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d529d86aa4b KAFKA-13559: Fix issue where responses intermittently 
takes 300+ ms to respond, even when the server is idle. (#12416)
d529d86aa4b is described below

commit d529d86aa4be533d1251cfc0b4c0fb57c69ace72
Author: Badai Aqrandista <[email protected]>
AuthorDate: Mon Aug 15 21:34:03 2022 +1000

    KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to 
respond, even when the server is idle. (#12416)
    
    Ensures that SSL buffered data is processed by server immediately on the 
next poll when channel is unmuted after processing previous request. Poll 
timeout is reset to zero for this case to avoid 300ms delay in poll() if no new 
data arrives on the sockets.
    
    Reviewers: David Mao <[email protected]>, Ismael Juma <[email protected]>, 
Rajini Sivaram <[email protected]>
---
 .../org/apache/kafka/common/network/Selector.java  |  1 +
 .../unit/kafka/network/SocketServerTest.scala      | 44 +++++++++++++++++++++-
 2 files changed, 43 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index bd1175a8ee0..2e581187625 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -757,6 +757,7 @@ public class Selector implements Selectable, AutoCloseable {
             explicitlyMutedChannels.remove(channel);
             if (channel.hasBytesBuffered()) {
                 keysWithBufferedRead.add(channel.selectionKey());
+                madeReadProgressLastPoll = true;
             }
         }
     }
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 98f92d61ff2..801a2d83cab 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -1878,6 +1878,44 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Test to ensure "Selector.poll()" does not block at "select(timeout)" when 
there is no data in the socket but there
+   * is data in the buffer. This only happens when SSL protocol is used.
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    props ++= sslServerProps
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeoutMs = 5000
+    // set pollTimeoutOverride to "selectTimeoutMs" to ensure poll() timeout 
is distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeoutMs)
+
+    try {
+      // initiate SSL connection by sending 1 request via socket, then send 2 
requests directly into the netReadBuffer
+      val (sslSocket, req1) = makeSocketWithBufferedRequests(testableServer, 
testableSelector, proxyServer)
+
+      // force all data to be transferred to the kafka broker by closing the 
client connection to proxy server
+      sslSocket.close()
+      TestUtils.waitUntilTrue(() => proxyServer.clientConnSocket.isClosed, 
"proxyServer.clientConnSocket is still not closed after 60000 ms", 60000)
+
+      // process the request and send the response
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // process the requests in the netReadBuffer, this should not block
+      val req2 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, req2)
+
+    } finally {
+      proxyServer.close()
+      shutdownServerAndMetrics(testableServer)
+    }
+  }
+
   private def sslServerProps: Properties = {
     val trustStoreFile = File.createTempFile("truststore", ".jks")
     val sslProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
interBrokerSecurityProtocol = Some(SecurityProtocol.SSL),
@@ -2044,10 +2082,12 @@ class SocketServerTest {
     }
 
     def testableSelector: TestableSelector =
-      
dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector]
+      testableProcessor.selector.asInstanceOf[TestableSelector]
 
-    def testableProcessor: TestableProcessor =
+    def testableProcessor: TestableProcessor = {
+      val endpoint = this.config.dataPlaneListeners.head
       
dataPlaneAcceptors.get(endpoint).processors(0).asInstanceOf[TestableProcessor]
+    }
 
     def waitForChannelClose(connectionId: String, locallyClosed: Boolean): 
Unit = {
       val selector = testableSelector

Reply via email to