This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 61530da  KAFKA-9190; Close connections with expired authentication 
sessions (#7723)
61530da is described below

commit 61530da45bd97c18786024f43c999981ca2d6479
Author: Ron Dagostino <[email protected]>
AuthorDate: Wed Dec 4 08:39:39 2019 -0800

    KAFKA-9190; Close connections with expired authentication sessions (#7723)
    
    This patch fixes a bug in `SocketServer` in the expiration of connections 
which have not re-authenticated quickly enough. Previously these connections 
were left hanging, but now they are properly closed and cleaned up. This was 
one cause of the flaky test failures in 
`EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl`.
    
    Reviewers: Jason Gustafson<[email protected]>, Rajini Sivaram 
<[email protected]>
---
 .../apache/kafka/common/network/KafkaChannel.java  |  6 +-
 .../authenticator/SaslServerAuthenticator.java     | 17 ++---
 .../main/scala/kafka/network/SocketServer.scala    |  5 +-
 .../unit/kafka/network/SocketServerTest.scala      | 85 ++++++++++++++++++++--
 4 files changed, 94 insertions(+), 19 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 3bca276..df85e97 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -571,8 +571,8 @@ public class KafkaChannel implements AutoCloseable {
          * We've delayed getting the time as long as possible in case we don't 
need it,
          * but at this point we need it -- so get it now.
          */
-        long nowNanos = nowNanosSupplier.get().longValue();
-        if (nowNanos < 
authenticator.clientSessionReauthenticationTimeNanos().longValue())
+        long nowNanos = nowNanosSupplier.get();
+        if (nowNanos < authenticator.clientSessionReauthenticationTimeNanos())
             return false;
         swapAuthenticatorsAndBeginReauthentication(new 
ReauthenticationContext(authenticator, receive, nowNanos));
         receive = null;
@@ -605,7 +605,7 @@ public class KafkaChannel implements AutoCloseable {
      */
     public boolean serverAuthenticationSessionExpired(long nowNanos) {
         Long serverSessionExpirationTimeNanos = 
authenticator.serverSessionExpirationTimeNanos();
-        return serverSessionExpirationTimeNanos != null && nowNanos - 
serverSessionExpirationTimeNanos.longValue() > 0;
+        return serverSessionExpirationTimeNanos != null && nowNanos - 
serverSessionExpirationTimeNanos > 0;
     }
     
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 1e62e7f..a22edae 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -663,30 +663,29 @@ public class SaslServerAuthenticator implements 
Authenticator {
             Long connectionsMaxReauthMs = 
connectionsMaxReauthMsByMechanism.get(saslMechanism);
             if (credentialExpirationMs != null || connectionsMaxReauthMs != 
null) {
                 if (credentialExpirationMs == null)
-                    retvalSessionLifetimeMs = 
zeroIfNegative(connectionsMaxReauthMs.longValue());
+                    retvalSessionLifetimeMs = 
zeroIfNegative(connectionsMaxReauthMs);
                 else if (connectionsMaxReauthMs == null)
-                    retvalSessionLifetimeMs = 
zeroIfNegative(credentialExpirationMs.longValue() - authenticationEndMs);
+                    retvalSessionLifetimeMs = 
zeroIfNegative(credentialExpirationMs - authenticationEndMs);
                 else
                     retvalSessionLifetimeMs = zeroIfNegative(
-                            Math.min(credentialExpirationMs.longValue() - 
authenticationEndMs,
-                                    connectionsMaxReauthMs.longValue()));
+                            Math.min(credentialExpirationMs - 
authenticationEndMs,
+                                    connectionsMaxReauthMs));
                 if (retvalSessionLifetimeMs > 0L)
-                    sessionExpirationTimeNanos = Long
-                            .valueOf(authenticationEndNanos + 1000 * 1000 * 
retvalSessionLifetimeMs);
+                    sessionExpirationTimeNanos = authenticationEndNanos + 1000 
* 1000 * retvalSessionLifetimeMs;
             }
             if (credentialExpirationMs != null) {
                 if (sessionExpirationTimeNanos != null)
                     LOG.debug(
                             "Authentication complete; session max lifetime 
from broker config={} ms, credential expiration={} ({} ms); session expiration 
= {} ({} ms), sending {} ms to client",
                             connectionsMaxReauthMs, new 
Date(credentialExpirationMs),
-                            Long.valueOf(credentialExpirationMs.longValue() - 
authenticationEndMs),
+                            credentialExpirationMs - authenticationEndMs,
                             new Date(authenticationEndMs + 
retvalSessionLifetimeMs), retvalSessionLifetimeMs,
                             retvalSessionLifetimeMs);
                 else
                     LOG.debug(
                             "Authentication complete; session max lifetime 
from broker config={} ms, credential expiration={} ({} ms); no session 
expiration, sending 0 ms to client",
                             connectionsMaxReauthMs, new 
Date(credentialExpirationMs),
-                            Long.valueOf(credentialExpirationMs.longValue() - 
authenticationEndMs));
+                            credentialExpirationMs - authenticationEndMs);
             } else {
                 if (sessionExpirationTimeNanos != null)
                     LOG.debug(
@@ -706,7 +705,7 @@ public class SaslServerAuthenticator implements 
Authenticator {
                 return null;
             // record at least 1 ms if there is some latency
             long latencyNanos = authenticationEndNanos - 
reauthenticationBeginNanos;
-            return latencyNanos == 0L ? 0L : Math.max(1L, 
Long.valueOf(Math.round(latencyNanos / 1000.0 / 1000.0)));
+            return latencyNanos == 0L ? 0L : Math.max(1L, 
Math.round(latencyNanos / 1000.0 / 1000.0));
         }
 
         private long zeroIfNegative(long value) {
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index d8c4a74..193d484 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -847,8 +847,9 @@ private[kafka] class Processor(val id: Int,
             else {
               val nowNanos = time.nanoseconds()
               if (channel.serverAuthenticationSessionExpired(nowNanos)) {
-                channel.disconnect()
-                debug(s"Disconnected expired channel: $channel : $header")
+                // be sure to decrease connection count and drop any in-flight 
responses
+                debug(s"Disconnecting expired channel: $channel : $header")
+                close(channel.id)
                 expiredConnectionsKilledCount.record(null, 1, 0)
               } else {
                 val connectionId = receive.source
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 9ccb3f5..b0dc052 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -26,19 +26,20 @@ import java.util.{HashMap, Properties, Random}
 import com.yammer.metrics.core.{Gauge, Meter}
 import com.yammer.metrics.{Metrics => YammerMetrics}
 import javax.net.ssl._
-
 import kafka.security.CredentialProvider
 import kafka.server.{KafkaConfig, ThrottledChannel}
 import kafka.utils.Implicits._
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.SaslAuthenticateRequestData
+import org.apache.kafka.common.message.SaslHandshakeRequestData
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.KafkaChannel.ChannelMuteState
 import org.apache.kafka.common.network.{ChannelBuilder, ChannelState, 
KafkaChannel, ListenerName, NetworkReceive, NetworkSend, Selector, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, 
RequestHeader}
+import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, 
RequestHeader, SaslAuthenticateRequest, SaslHandshakeRequest}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.utils.{LogContext, MockTime, Time}
@@ -107,6 +108,14 @@ class SocketServerTest extends JUnitSuite {
       outgoing.flush()
   }
 
+  def sendApiRequest(socket: Socket, request: AbstractRequest, header: 
RequestHeader) = {
+    val byteBuffer = request.serialize(header)
+    byteBuffer.rewind()
+    val serializedBytes = new Array[Byte](byteBuffer.remaining)
+    byteBuffer.get(serializedBytes)
+    sendRequest(socket, serializedBytes)
+  }
+
   def receiveResponse(socket: Socket): Array[Byte] = {
     val incoming = new DataInputStream(socket.getInputStream)
     val len = incoming.readInt()
@@ -654,7 +663,72 @@ class SocketServerTest extends JUnitSuite {
   }
 
   @Test
-  def testSessionPrincipal() {
+  def testSaslReauthenticationFailure(): Unit = {
+    shutdownServerAndMetrics(server) // we will use our own instance because 
we require custom configs
+    val username = "admin"
+    val password = "admin-secret"
+    val reauthMs = 1500
+    val brokerProps = new Properties
+    brokerProps.setProperty("listeners", "SASL_PLAINTEXT://localhost:0")
+    brokerProps.setProperty("security.inter.broker.protocol", "SASL_PLAINTEXT")
+    
brokerProps.setProperty("listener.name.sasl_plaintext.plain.sasl.jaas.config",
+      "org.apache.kafka.common.security.plain.PlainLoginModule required " +
+        "username=\"%s\" password=\"%s\" user_%s=\"%s\";".format(username, 
password, username, password))
+    brokerProps.setProperty("sasl.mechanism.inter.broker.protocol", "PLAIN")
+    
brokerProps.setProperty("listener.name.sasl_plaintext.sasl.enabled.mechanisms", 
"PLAIN")
+    brokerProps.setProperty("num.network.threads", "1")
+    brokerProps.setProperty("connections.max.reauth.ms", reauthMs.toString)
+    val overrideProps = TestUtils.createBrokerConfig(0, 
TestUtils.MockZkConnect,
+      saslProperties = Some(brokerProps), enableSaslPlaintext = true)
+    val time = new MockTime()
+    val overrideServer = new 
TestableSocketServer(KafkaConfig.fromProps(overrideProps), time = time)
+    try {
+      overrideServer.startup()
+      val socket = connect(overrideServer, 
ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT))
+
+      val correlationId = -1
+      val clientId = ""
+      // send a SASL handshake request
+      val saslHandshakeRequest = new SaslHandshakeRequest.Builder(new 
SaslHandshakeRequestData().setMechanism("PLAIN"))
+        .build()
+      val saslHandshakeHeader = new RequestHeader(ApiKeys.SASL_HANDSHAKE, 
saslHandshakeRequest.version, clientId,
+        correlationId)
+      sendApiRequest(socket, saslHandshakeRequest, saslHandshakeHeader)
+      receiveResponse(socket)
+
+      // now send credentials within a SaslAuthenticateRequest
+      val authBytes = "admin\u0000admin\u0000admin-secret".getBytes("UTF-8")
+      val saslAuthenticateRequest = new SaslAuthenticateRequest.Builder(new 
SaslAuthenticateRequestData()
+        .setAuthBytes(authBytes)).build()
+      val saslAuthenticateHeader = new 
RequestHeader(ApiKeys.SASL_AUTHENTICATE, saslAuthenticateRequest.version,
+        clientId, correlationId)
+      sendApiRequest(socket, saslAuthenticateRequest, saslAuthenticateHeader)
+      receiveResponse(socket)
+      assertEquals(1, overrideServer.testableSelector.channels.size)
+
+      // advance the clock long enough to cause server-side disconnection upon 
next send...
+      time.sleep(reauthMs * 2)
+      // ...and now send something to trigger the disconnection
+      val ackTimeoutMs = 10000
+      val ack = 0: Short
+      val emptyRequest = ProduceRequest.Builder.forCurrentMagic(ack, 
ackTimeoutMs,
+        new HashMap[TopicPartition, MemoryRecords]()).build()
+      val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, 
emptyRequest.version, clientId, correlationId)
+      sendApiRequest(socket, emptyRequest, emptyHeader)
+      // wait a little bit for the server-side disconnection to occur since it 
happens asynchronously
+      try {
+        TestUtils.waitUntilTrue(() => 
overrideServer.testableSelector.channels.isEmpty,
+          "Expired connection was not closed", 1000, 100)
+      } finally {
+        socket.close()
+      }
+    } finally {
+      shutdownServerAndMetrics(overrideServer)
+    }
+  }
+
+  @Test
+  def testSessionPrincipal(): Unit = {
     val socket = connect()
     val bytes = new Array[Byte](40)
     sendRequest(socket, bytes, Some(0))
@@ -1191,8 +1265,9 @@ class SocketServerTest extends JUnitSuite {
     }
   }
 
-  class TestableSocketServer(config : KafkaConfig = config, val 
connectionQueueSize: Int = 20) extends SocketServer(config,
-      new Metrics, Time.SYSTEM, credentialProvider) {
+  class TestableSocketServer(config : KafkaConfig = config, val 
connectionQueueSize: Int = 20,
+                             override val time: Time = Time.SYSTEM) extends 
SocketServer(config,
+      new Metrics, time, credentialProvider) {
 
     @volatile var selector: Option[TestableSelector] = None
 

Reply via email to