This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5d1bfa0  KAFKA-6950: Delay response to failed client authentication to 
prevent potential DoS issues (KIP-306) (#5082)
5d1bfa0 is described below

commit 5d1bfa0665ce0850ee76cee152e397c23ac329a7
Author: Dhruvil Shah <[email protected]>
AuthorDate: Fri Aug 31 04:04:33 2018 -0700

    KAFKA-6950: Delay response to failed client authentication to prevent 
potential DoS issues (KIP-306) (#5082)
    
    Reviewers: Ron Dagostino <[email protected]>, Ismael Juma 
<[email protected]>, Rajini Sivaram <[email protected]>
---
 .../common/errors/AuthenticationException.java     |   4 +
 .../apache/kafka/common/network/Authenticator.java |   9 +-
 .../DelayedResponseAuthenticationException.java    |  27 +++
 .../apache/kafka/common/network/KafkaChannel.java  |  27 ++-
 .../org/apache/kafka/common/network/Selector.java  | 142 ++++++++++++-
 .../authenticator/SaslServerAuthenticator.java     |  31 ++-
 .../kafka/common/network/NetworkTestUtils.java     |  36 +++-
 .../apache/kafka/common/network/NioEchoServer.java |  12 +-
 .../common/network/SslTransportLayerTest.java      |  20 +-
 .../ClientAuthenticationFailureTest.java           |   4 +-
 .../SaslAuthenticatorFailureDelayTest.java         | 229 +++++++++++++++++++++
 .../authenticator/SaslAuthenticatorTest.java       |  13 +-
 .../main/scala/kafka/network/SocketServer.scala    |   3 +
 core/src/main/scala/kafka/server/KafkaConfig.scala |  12 ++
 .../SaslGssapiSslEndToEndAuthorizationTest.scala   |   4 +-
 .../scala/integration/kafka/api/SaslSetup.scala    |   8 +-
 .../kafka/server/GssapiAuthenticationTest.scala    |  43 +++-
 .../unit/kafka/network/SocketServerTest.scala      |  31 +--
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   1 +
 .../scala/unit/kafka/utils/JaasTestUtils.scala     |   9 +-
 20 files changed, 603 insertions(+), 62 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
index f6458c6..7a05eba 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
@@ -40,6 +40,10 @@ public class AuthenticationException extends ApiException {
         super(message);
     }
 
+    public AuthenticationException(Throwable cause) {
+        super(cause);
+    }
+
     public AuthenticationException(String message, Throwable cause) {
         super(message, cause);
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
index 4e2e727..33c2e90 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
@@ -38,6 +38,14 @@ public interface Authenticator extends Closeable {
     void authenticate() throws AuthenticationException, IOException;
 
     /**
+     * Perform any processing related to authentication failure. This is 
invoked when the channel is about to be closed
+     * because of an {@link AuthenticationException} thrown from a prior 
{@link #authenticate()} call.
+     * @throws IOException if read/write fails due to an I/O error
+     */
+    default void handleAuthenticationFailure() throws IOException {
+    }
+
+    /**
      * Returns Principal using PrincipalBuilder
      */
     KafkaPrincipal principal();
@@ -46,5 +54,4 @@ public interface Authenticator extends Closeable {
      * returns true if authentication is complete otherwise returns false;
      */
     boolean complete();
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/DelayedResponseAuthenticationException.java
 
b/clients/src/main/java/org/apache/kafka/common/network/DelayedResponseAuthenticationException.java
new file mode 100644
index 0000000..8474426
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/DelayedResponseAuthenticationException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.errors.AuthenticationException;
+
+public class DelayedResponseAuthenticationException extends 
AuthenticationException {
+    private static final long serialVersionUID = 1L;
+
+    public DelayedResponseAuthenticationException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 1839729..17dc6a3 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -120,15 +120,22 @@ public class KafkaChannel {
      * authentication. For SASL, authentication is performed by {@link 
Authenticator#authenticate()}.
      */
     public void prepare() throws AuthenticationException, IOException {
+        boolean authenticating = false;
         try {
             if (!transportLayer.ready())
                 transportLayer.handshake();
-            if (transportLayer.ready() && !authenticator.complete())
+            if (transportLayer.ready() && !authenticator.complete()) {
+                authenticating = true;
                 authenticator.authenticate();
+            }
         } catch (AuthenticationException e) {
             // Clients are notified of authentication exceptions to enable 
operations to be terminated
             // without retries. Other errors are handled as network exceptions 
in Selector.
             state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED, 
e);
+            if (authenticating) {
+                delayCloseOnAuthenticationFailure();
+                throw new DelayedResponseAuthenticationException(e);
+            }
             throw e;
         }
         if (ready())
@@ -237,6 +244,24 @@ public class KafkaChannel {
     }
 
     /**
+     * Delay channel close on authentication failure. This will remove all 
read/write operations from the channel until
+     * {@link #completeCloseOnAuthenticationFailure()} is called to finish up 
the channel close.
+     */
+    private void delayCloseOnAuthenticationFailure() {
+        transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
+    }
+
+    /**
+     * Finish up any processing on {@link #prepare()} failure.
+     * @throws IOException
+     */
+    void completeCloseOnAuthenticationFailure() throws IOException {
+        transportLayer.addInterestOps(SelectionKey.OP_WRITE);
+        // Invoke the underlying handler to finish up any processing on 
authentication failure
+        authenticator.handleAuthenticationFailure();
+    }
+
+    /**
      * Returns true if this channel has been explicitly muted using {@link 
KafkaChannel#mute()}
      */
     public boolean isMute() {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 7e32509..806bda7 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -86,6 +86,7 @@ import java.util.concurrent.TimeUnit;
 public class Selector implements Selectable, AutoCloseable {
 
     public static final long NO_IDLE_TIMEOUT_MS = -1;
+    public static final int NO_FAILED_AUTHENTICATION_DELAY = 0;
 
     private enum CloseMode {
         GRACEFUL(true),            // process outstanding staged receives, 
notify disconnect
@@ -119,8 +120,11 @@ public class Selector implements Selectable, AutoCloseable 
{
     private final int maxReceiveSize;
     private final boolean recordTimePerConnection;
     private final IdleExpiryManager idleExpiryManager;
+    private final LinkedHashMap<String, DelayedAuthenticationFailureClose> 
delayedClosingChannels;
     private final MemoryPool memoryPool;
     private final long lowMemThreshold;
+    private final int failedAuthenticationDelayMs;
+
     //indicates if the previous call to poll was able to make progress in 
reading already-buffered data.
     //this is used to prevent tight loops when memory is not available to read 
any more data
     private boolean madeReadProgressLastPoll = true;
@@ -129,6 +133,8 @@ public class Selector implements Selectable, AutoCloseable {
      * Create a new nioSelector
      * @param maxReceiveSize Max size in bytes of a single network receive 
(use {@link NetworkReceive#UNLIMITED} for no limit)
      * @param connectionMaxIdleMs Max idle connection time (use {@link 
#NO_IDLE_TIMEOUT_MS} to disable idle timeout)
+     * @param failedAuthenticationDelayMs Minimum time by which failed 
authentication response and channel close should be delayed by.
+     *                                    Use {@link 
#NO_FAILED_AUTHENTICATION_DELAY} to disable this delay.
      * @param metrics Registry for Selector metrics
      * @param time Time implementation
      * @param metricGrpPrefix Prefix for the group of metrics registered by 
Selector
@@ -139,6 +145,7 @@ public class Selector implements Selectable, AutoCloseable {
      */
     public Selector(int maxReceiveSize,
             long connectionMaxIdleMs,
+            int failedAuthenticationDelayMs,
             Metrics metrics,
             Time time,
             String metricGrpPrefix,
@@ -174,9 +181,40 @@ public class Selector implements Selectable, AutoCloseable 
{
         this.memoryPool = memoryPool;
         this.lowMemThreshold = (long) (0.1 * this.memoryPool.size());
         this.log = logContext.logger(Selector.class);
+        this.failedAuthenticationDelayMs = failedAuthenticationDelayMs;
+        this.delayedClosingChannels = (failedAuthenticationDelayMs > 
NO_FAILED_AUTHENTICATION_DELAY) ? new LinkedHashMap<String, 
DelayedAuthenticationFailureClose>() : null;
+    }
+
+    public Selector(int maxReceiveSize,
+                    long connectionMaxIdleMs,
+                    Metrics metrics,
+                    Time time,
+                    String metricGrpPrefix,
+                    Map<String, String> metricTags,
+                    boolean metricsPerConnection,
+                    boolean recordTimePerConnection,
+                    ChannelBuilder channelBuilder,
+                    MemoryPool memoryPool,
+                    LogContext logContext) {
+        this(maxReceiveSize, connectionMaxIdleMs, 
NO_FAILED_AUTHENTICATION_DELAY, metrics, time, metricGrpPrefix, metricTags,
+                metricsPerConnection, recordTimePerConnection, channelBuilder, 
memoryPool, logContext);
     }
 
     public Selector(int maxReceiveSize,
+                long connectionMaxIdleMs,
+                int failedAuthenticationDelayMs,
+                Metrics metrics,
+                Time time,
+                String metricGrpPrefix,
+                Map<String, String> metricTags,
+                boolean metricsPerConnection,
+                ChannelBuilder channelBuilder,
+                LogContext logContext) {
+        this(maxReceiveSize, connectionMaxIdleMs, failedAuthenticationDelayMs, 
metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, 
channelBuilder, MemoryPool.NONE, logContext);
+    }
+
+
+    public Selector(int maxReceiveSize,
             long connectionMaxIdleMs,
             Metrics metrics,
             Time time,
@@ -185,13 +223,17 @@ public class Selector implements Selectable, 
AutoCloseable {
             boolean metricsPerConnection,
             ChannelBuilder channelBuilder,
             LogContext logContext) {
-        this(maxReceiveSize, connectionMaxIdleMs, metrics, time, 
metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder, 
MemoryPool.NONE, logContext);
+        this(maxReceiveSize, connectionMaxIdleMs, 
NO_FAILED_AUTHENTICATION_DELAY, metrics, time, metricGrpPrefix, metricTags, 
metricsPerConnection, channelBuilder, logContext);
     }
 
     public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, 
String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) {
         this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, 
metricGrpPrefix, Collections.<String, String>emptyMap(), true, channelBuilder, 
logContext);
     }
 
+    public Selector(long connectionMaxIdleMS, int failedAuthenticationDelayMs, 
Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder 
channelBuilder, LogContext logContext) {
+        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, 
failedAuthenticationDelayMs, metrics, time, metricGrpPrefix, 
Collections.<String, String>emptyMap(), true, channelBuilder, logContext);
+    }
+
     /**
      * Begin connecting to the given address and add the connection to this 
nioSelector associated with the given id
      * number.
@@ -435,6 +477,9 @@ public class Selector implements Selectable, AutoCloseable {
         long endIo = time.nanoseconds();
         this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
 
+        // Close channels that were delayed and are now ready to be closed
+        completeDelayedChannelClose(endIo);
+
         // we use the time at the end of select to ensure that we don't close 
any connections that
         // have just been processed in pollSelectionKeys
         maybeCloseOldestConnection(endSelect);
@@ -457,15 +502,14 @@ public class Selector implements Selectable, 
AutoCloseable {
         for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
             KafkaChannel channel = channel(key);
             long channelStartTimeNanos = recordTimePerConnection ? 
time.nanoseconds() : 0;
+            boolean sendFailed = false;
 
             // register all per-connection metrics at once
             sensors.maybeRegisterConnectionMetrics(channel.id());
             if (idleExpiryManager != null)
                 idleExpiryManager.update(channel.id(), currentTimeNanos);
 
-            boolean sendFailed = false;
             try {
-
                 /* complete any connections that have finished their handshake 
(either normally or immediately) */
                 if (isImmediatelyConnected || key.isConnectable()) {
                     if (channel.finishConnect()) {
@@ -477,8 +521,9 @@ public class Selector implements Selectable, AutoCloseable {
                                 socketChannel.socket().getSendBufferSize(),
                                 socketChannel.socket().getSoTimeout(),
                                 channel.id());
-                    } else
+                    } else {
                         continue;
+                    }
                 }
 
                 /* if channel is not ready finish prepare */
@@ -532,7 +577,11 @@ public class Selector implements Selectable, AutoCloseable 
{
                     log.debug("Connection with {} disconnected due to 
authentication exception", desc, e);
                 else
                     log.warn("Unexpected error from {}; closing connection", 
desc, e);
-                close(channel, sendFailed ? CloseMode.NOTIFY_ONLY : 
CloseMode.GRACEFUL);
+
+                if (e instanceof DelayedResponseAuthenticationException)
+                    maybeDelayCloseOnAuthenticationFailure(channel);
+                else
+                    close(channel, sendFailed ? CloseMode.NOTIFY_ONLY : 
CloseMode.GRACEFUL);
             } finally {
                 maybeRecordTimePerConnection(channel, channelStartTimeNanos);
             }
@@ -631,6 +680,18 @@ public class Selector implements Selectable, AutoCloseable 
{
             unmute(channel);
     }
 
+    // package-private for testing
+    void completeDelayedChannelClose(long currentTimeNanos) {
+        if (delayedClosingChannels == null)
+            return;
+
+        while (!delayedClosingChannels.isEmpty()) {
+            DelayedAuthenticationFailureClose delayedClose = 
delayedClosingChannels.values().iterator().next();
+            if (!delayedClose.tryClose(currentTimeNanos))
+                break;
+        }
+    }
+
     private void maybeCloseOldestConnection(long currentTimeNanos) {
         if (idleExpiryManager == null)
             return;
@@ -657,6 +718,7 @@ public class Selector implements Selectable, AutoCloseable {
         this.completedReceives.clear();
         this.connected.clear();
         this.disconnected.clear();
+
         // Remove closed channels after all their staged receives have been 
processed or if a send was requested
         for (Iterator<Map.Entry<String, KafkaChannel>> it = 
closingChannels.entrySet().iterator(); it.hasNext(); ) {
             KafkaChannel channel = it.next().getValue();
@@ -667,6 +729,7 @@ public class Selector implements Selectable, AutoCloseable {
                 it.remove();
             }
         }
+
         for (String channel : this.failedSends)
             this.disconnected.put(channel, ChannelState.FAILED_SEND);
         this.failedSends.clear();
@@ -707,6 +770,24 @@ public class Selector implements Selectable, AutoCloseable 
{
         }
     }
 
+    private void maybeDelayCloseOnAuthenticationFailure(KafkaChannel channel) {
+        DelayedAuthenticationFailureClose delayedClose = new 
DelayedAuthenticationFailureClose(channel, failedAuthenticationDelayMs);
+        if (delayedClosingChannels != null)
+            delayedClosingChannels.put(channel.id(), delayedClose);
+        else
+            delayedClose.closeNow();
+    }
+
+    private void handleCloseOnAuthenticationFailure(KafkaChannel channel) {
+        try {
+            channel.completeCloseOnAuthenticationFailure();
+        } catch (Exception e) {
+            log.error("Exception handling close on authentication failure node 
{}", channel.id(), e);
+        } finally {
+            close(channel, CloseMode.GRACEFUL);
+        }
+    }
+
     /**
      * Begin closing this connection.
      * If 'closeMode' is `CloseMode.GRACEFUL`, the channel is disconnected 
here, but staged receives
@@ -735,10 +816,14 @@ public class Selector implements Selectable, 
AutoCloseable {
             // stagedReceives will be moved to completedReceives later along 
with receives from other channels
             closingChannels.put(channel.id(), channel);
             log.debug("Tracking closing connection {} to process outstanding 
requests", channel.id());
-        } else
+        } else {
             doClose(channel, closeMode.notifyDisconnect);
+        }
         this.channels.remove(channel.id());
 
+        if (delayedClosingChannels != null)
+            delayedClosingChannels.remove(channel.id());
+
         if (idleExpiryManager != null)
             idleExpiryManager.remove(channel.id());
     }
@@ -1064,6 +1149,46 @@ public class Selector implements Selectable, 
AutoCloseable {
         }
     }
 
+    /**
+     * Encapsulate a channel that must be closed after a specific delay has 
elapsed due to authentication failure.
+     */
+    private class DelayedAuthenticationFailureClose {
+        private final KafkaChannel channel;
+        private final long endTimeNanos;
+        private boolean closed;
+
+        /**
+         * @param channel The channel whose close is being delayed
+         * @param delayMs The amount of time by which the operation should be 
delayed
+         */
+        public DelayedAuthenticationFailureClose(KafkaChannel channel, int 
delayMs) {
+            this.channel = channel;
+            this.endTimeNanos = time.nanoseconds() + (delayMs * 1000L * 1000L);
+            this.closed = false;
+        }
+
+        /**
+         * Try to close this channel if the delay has expired.
+         * @param currentTimeNanos The current time
+         * @return True if the delay has expired and the channel was closed; 
false otherwise
+         */
+        public final boolean tryClose(long currentTimeNanos) {
+            if (endTimeNanos <= currentTimeNanos)
+                closeNow();
+            return closed;
+        }
+
+        /**
+         * Close the channel now, regardless of whether the delay has expired 
or not.
+         */
+        public final void closeNow() {
+            if (closed)
+                throw new IllegalStateException("Attempt to close a channel 
that has already been closed");
+            handleCloseOnAuthenticationFailure(channel);
+            closed = true;
+        }
+    }
+
     // helper class for tracking least recently used connections to enable 
idle connection closing
     private static class IdleExpiryManager {
         private final Map<String, Long> lruConnections;
@@ -1114,4 +1239,9 @@ public class Selector implements Selectable, 
AutoCloseable {
     boolean isMadeReadProgressLastPoll() {
         return madeReadProgressLastPoll;
     }
+
+    // package-private for testing
+    Map<?, ?> delayedClosingChannels() {
+        return delayedClosingChannels;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index e8f77a5..43cb0a4 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -118,6 +118,7 @@ public class SaslServerAuthenticator implements 
Authenticator {
     // buffers used in `authenticate`
     private NetworkReceive netInBuffer;
     private Send netOutBuffer;
+    private Send authenticationFailureSend = null;
     // flag indicating if sasl tokens are sent as Kafka SaslAuthenticate 
request/responses
     private boolean enableKafkaSaslAuthenticateHeaders;
 
@@ -295,6 +296,11 @@ public class SaslServerAuthenticator implements 
Authenticator {
     }
 
     @Override
+    public void handleAuthenticationFailure() throws IOException {
+        sendAuthenticationFailureResponse();
+    }
+
+    @Override
     public void close() throws IOException {
         if (principalBuilder instanceof Closeable)
             Utils.closeQuietly((Closeable) principalBuilder, "principal 
builder");
@@ -362,7 +368,7 @@ public class SaslServerAuthenticator implements 
Authenticator {
             RequestAndSize requestAndSize = 
requestContext.parseRequest(requestBuffer);
             if (apiKey != ApiKeys.SASL_AUTHENTICATE) {
                 IllegalSaslStateException e = new 
IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " 
during SASL authentication.");
-                sendKafkaResponse(requestContext, 
requestAndSize.request.getErrorResponse(e));
+                buildResponseOnAuthenticateFailure(requestContext, 
requestAndSize.request.getErrorResponse(e));
                 throw e;
             }
             if (!apiKey.isVersionSupported(version)) {
@@ -378,7 +384,8 @@ public class SaslServerAuthenticator implements 
Authenticator {
                 ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER 
: ByteBuffer.wrap(responseToken);
                 sendKafkaResponse(requestContext, new 
SaslAuthenticateResponse(Errors.NONE, null, responseBuf));
             } catch (SaslAuthenticationException e) {
-                sendKafkaResponse(requestContext, new 
SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()));
+                buildResponseOnAuthenticateFailure(requestContext,
+                        new 
SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()));
                 throw e;
             } catch (SaslException e) {
                 KerberosError kerberosError = KerberosError.fromException(e);
@@ -464,7 +471,7 @@ public class SaslServerAuthenticator implements 
Authenticator {
             return clientMechanism;
         } else {
             LOG.debug("SASL mechanism '{}' requested by client is not 
supported", clientMechanism);
-            sendKafkaResponse(context, new 
SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, enabledMechanisms));
+            buildResponseOnAuthenticateFailure(context, new 
SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, enabledMechanisms));
             throw new UnsupportedSaslMechanismException("Unsupported SASL 
mechanism " + clientMechanism);
         }
     }
@@ -491,6 +498,24 @@ public class SaslServerAuthenticator implements 
Authenticator {
         }
     }
 
+    /**
+     * Build a {@link Send} response on {@link #authenticate()} failure. The 
actual response is sent out when
+     * {@link #sendAuthenticationFailureResponse()} is called.
+     */
+    private void buildResponseOnAuthenticateFailure(RequestContext context, 
AbstractResponse response) {
+        authenticationFailureSend = context.buildResponse(response);
+    }
+
+    /**
+     * Send any authentication failure response that may have been previously 
built.
+     */
+    private void sendAuthenticationFailureResponse() throws IOException {
+        if (authenticationFailureSend == null)
+            return;
+        sendKafkaResponse(authenticationFailureSend);
+        authenticationFailureSend = null;
+    }
+
     private void sendKafkaResponse(RequestContext context, AbstractResponse 
response) throws IOException {
         sendKafkaResponse(context.buildResponse(response));
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java 
b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
index 5998049..b08c8c1 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.network;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -28,6 +29,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 
@@ -35,16 +37,22 @@ import org.apache.kafka.test.TestUtils;
  * Common utility functions used by transport layer and authenticator tests.
  */
 public class NetworkTestUtils {
+    public static NioEchoServer createEchoServer(ListenerName listenerName, 
SecurityProtocol securityProtocol,
+                                                 AbstractConfig serverConfig, 
CredentialCache credentialCache, Time time) throws Exception {
+        return createEchoServer(listenerName, securityProtocol, serverConfig, 
credentialCache, 100, time);
+    }
 
     public static NioEchoServer createEchoServer(ListenerName listenerName, 
SecurityProtocol securityProtocol,
-            AbstractConfig serverConfig, CredentialCache credentialCache) 
throws Exception {
-        NioEchoServer server = new NioEchoServer(listenerName, 
securityProtocol, serverConfig, "localhost", null, credentialCache);
+                                                 AbstractConfig serverConfig, 
CredentialCache credentialCache,
+                                                 int 
failedAuthenticationDelayMs, Time time) throws Exception {
+        NioEchoServer server = new NioEchoServer(listenerName, 
securityProtocol, serverConfig, "localhost",
+                null, credentialCache, failedAuthenticationDelayMs, time);
         server.start();
         return server;
     }
 
-    public static Selector createSelector(ChannelBuilder channelBuilder) {
-        return new Selector(5000, new Metrics(), new MockTime(), 
"MetricGroup", channelBuilder, new LogContext());
+    public static Selector createSelector(ChannelBuilder channelBuilder, Time 
time) {
+        return new Selector(5000, new Metrics(), time, "MetricGroup", 
channelBuilder, new LogContext());
     }
 
     public static void checkClientConnection(Selector selector, String node, 
int minMessageSize, int messageCount) throws Exception {
@@ -79,19 +87,33 @@ public class NetworkTestUtils {
         assertTrue(selector.isChannelReady(node));
     }
 
-    public static ChannelState waitForChannelClose(Selector selector, String 
node, ChannelState.State channelState)
+    public static ChannelState waitForChannelClose(Selector selector, String 
node, ChannelState.State channelState, MockTime mockTime)
             throws IOException {
         boolean closed = false;
-        for (int i = 0; i < 30; i++) {
-            selector.poll(1000L);
+        for (int i = 0; i < 300; i++) {
+            selector.poll(100L);
             if (selector.channel(node) == null && 
selector.closingChannel(node) == null) {
                 closed = true;
                 break;
             }
+            if (mockTime != null)
+                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 150);
         }
         assertTrue("Channel was not closed by timeout", closed);
         ChannelState finalState = selector.disconnected().get(node);
         assertEquals(channelState, finalState.state());
         return finalState;
     }
+
+    public static ChannelState waitForChannelClose(Selector selector, String 
node, ChannelState.State channelState) throws IOException {
+        return waitForChannelClose(selector, node, channelState, null);
+    }
+
+    public static void completeDelayedChannelClose(Selector selector, long 
currentTimeNanos) {
+        selector.completeDelayedChannelClose(currentTimeNanos);
+    }
+
+    public static Map<?, ?> delayedClosingChannels(Selector selector) {
+        return selector.delayedClosingChannels();
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java 
b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 64b7e4e..bd212fd 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -27,7 +27,7 @@ import 
org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.scram.ScramCredential;
 import org.apache.kafka.common.security.scram.internals.ScramMechanism;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 
@@ -69,7 +69,13 @@ public class NioEchoServer extends Thread {
     private final DelegationTokenCache tokenCache;
 
     public NioEchoServer(ListenerName listenerName, SecurityProtocol 
securityProtocol, AbstractConfig config,
-            String serverHost, ChannelBuilder channelBuilder, CredentialCache 
credentialCache) throws Exception {
+                         String serverHost, ChannelBuilder channelBuilder, 
CredentialCache credentialCache, Time time) throws Exception {
+        this(listenerName, securityProtocol, config, serverHost, 
channelBuilder, credentialCache, 100, time);
+    }
+
+    public NioEchoServer(ListenerName listenerName, SecurityProtocol 
securityProtocol, AbstractConfig config,
+                         String serverHost, ChannelBuilder channelBuilder, 
CredentialCache credentialCache,
+                         int failedAuthenticationDelayMs, Time time) throws 
Exception {
         super("echoserver");
         setDaemon(true);
         serverSocketChannel = ServerSocketChannel.open();
@@ -89,7 +95,7 @@ public class NioEchoServer extends Thread {
         if (channelBuilder == null)
             channelBuilder = 
ChannelBuilders.serverChannelBuilder(listenerName, false, securityProtocol, 
config, credentialCache, tokenCache);
         this.metrics = new Metrics();
-        this.selector = new Selector(5000, metrics, new MockTime(), 
"MetricGroup", channelBuilder, new LogContext());
+        this.selector = new Selector(5000, failedAuthenticationDelayMs, 
metrics, time, "MetricGroup", channelBuilder, new LogContext());
         acceptorThread = new AcceptorThread();
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 919c167..6783438 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.TestSecurityConfig;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestCondition;
@@ -64,6 +63,7 @@ import static org.junit.Assert.fail;
 public class SslTransportLayerTest {
 
     private static final int BUFFER_SIZE = 4 * 1024;
+    private static Time time = Time.SYSTEM;
 
     private NioEchoServer server;
     private Selector selector;
@@ -82,7 +82,7 @@ public class SslTransportLayerTest {
         sslClientConfigs = 
clientCertStores.getTrustingConfig(serverCertStores);
         this.channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
         this.channelBuilder.configure(sslClientConfigs);
-        this.selector = new Selector(5000, new Metrics(), new MockTime(), 
"MetricGroup", channelBuilder, new LogContext());
+        this.selector = new Selector(5000, new Metrics(), time, "MetricGroup", 
channelBuilder, new LogContext());
     }
 
     @After
@@ -204,7 +204,7 @@ public class SslTransportLayerTest {
         };
         serverChannelBuilder.configure(sslServerConfigs);
         server = new 
NioEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL), 
SecurityProtocol.SSL,
-                new TestSecurityConfig(sslServerConfigs), "localhost", 
serverChannelBuilder, null);
+                new TestSecurityConfig(sslServerConfigs), "localhost", 
serverChannelBuilder, null, time);
         server.start();
 
         createSelector(sslClientConfigs);
@@ -786,7 +786,7 @@ public class SslTransportLayerTest {
             channelBuilder.flushFailureAction = flushFailureAction;
             channelBuilder.failureIndex = i;
             channelBuilder.configure(sslClientConfigs);
-            this.selector = new Selector(5000, new Metrics(), new MockTime(), 
"MetricGroup", channelBuilder, new LogContext());
+            this.selector = new Selector(5000, new Metrics(), time, 
"MetricGroup", channelBuilder, new LogContext());
 
             InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
             selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@@ -829,7 +829,7 @@ public class SslTransportLayerTest {
             serverChannelBuilder.flushDelayCount = i;
             server = new 
NioEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL),
                     SecurityProtocol.SSL, new 
TestSecurityConfig(sslServerConfigs),
-                    "localhost", serverChannelBuilder, null);
+                    "localhost", serverChannelBuilder, null, time);
             server.start();
             createSelector(sslClientConfigs);
             InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
@@ -855,7 +855,7 @@ public class SslTransportLayerTest {
         String node = "0";
         server = createEchoServer(securityProtocol);
         clientChannelBuilder.configure(sslClientConfigs);
-        this.selector = new Selector(5000, new Metrics(), new MockTime(), 
"MetricGroup", clientChannelBuilder, new LogContext());
+        this.selector = new Selector(5000, new Metrics(), time, "MetricGroup", 
clientChannelBuilder, new LogContext());
         InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
@@ -895,7 +895,7 @@ public class SslTransportLayerTest {
         ChannelBuilder serverChannelBuilder = 
ChannelBuilders.serverChannelBuilder(listenerName,
                 false, securityProtocol, config, null, null);
         server = new NioEchoServer(listenerName, securityProtocol, config,
-                "localhost", serverChannelBuilder, null);
+                "localhost", serverChannelBuilder, null, time);
         server.start();
         InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
 
@@ -955,7 +955,7 @@ public class SslTransportLayerTest {
         ChannelBuilder serverChannelBuilder = 
ChannelBuilders.serverChannelBuilder(listenerName,
                 false, securityProtocol, config, null, null);
         server = new NioEchoServer(listenerName, securityProtocol, config,
-                "localhost", serverChannelBuilder, null);
+                "localhost", serverChannelBuilder, null, time);
         server.start();
         InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
 
@@ -1027,12 +1027,12 @@ public class SslTransportLayerTest {
         channelBuilder.configureBufferSizes(netReadBufSize, netWriteBufSize, 
appBufSize);
         this.channelBuilder = channelBuilder;
         this.channelBuilder.configure(sslClientConfigs);
-        this.selector = new Selector(5000, new Metrics(), new MockTime(), 
"MetricGroup", channelBuilder, new LogContext());
+        this.selector = new Selector(5000, new Metrics(), time, "MetricGroup", 
channelBuilder, new LogContext());
         return selector;
     }
 
     private NioEchoServer createEchoServer(ListenerName listenerName, 
SecurityProtocol securityProtocol) throws Exception {
-        return NetworkTestUtils.createEchoServer(listenerName, 
securityProtocol, new TestSecurityConfig(sslServerConfigs), null);
+        return NetworkTestUtils.createEchoServer(listenerName, 
securityProtocol, new TestSecurityConfig(sslServerConfigs), null, time);
     }
 
     private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) 
throws Exception {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index 413997f..2fce4c5 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.security.TestSecurityConfig;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,6 +49,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class ClientAuthenticationFailureTest {
+    private static MockTime time = new MockTime(50);
 
     private NioEchoServer server;
     private Map<String, Object> saslServerConfigs;
@@ -147,6 +149,6 @@ public class ClientAuthenticationFailureTest {
 
     private NioEchoServer createEchoServer(ListenerName listenerName, 
SecurityProtocol securityProtocol) throws Exception {
         return NetworkTestUtils.createEchoServer(listenerName, 
securityProtocol,
-                new TestSecurityConfig(saslServerConfigs), new 
CredentialCache());
+                new TestSecurityConfig(saslServerConfigs), new 
CredentialCache(), time);
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
new file mode 100644
index 0000000..b0dfc7a
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
+import org.apache.kafka.common.network.CertStores;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.ChannelBuilders;
+import org.apache.kafka.common.network.ChannelState;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.network.NetworkTestUtils;
+import org.apache.kafka.common.network.NioEchoServer;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.TestSecurityConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(value = Parameterized.class)
+public class SaslAuthenticatorFailureDelayTest {
+    private static final int BUFFER_SIZE = 4 * 1024;
+    private static MockTime time = new MockTime(50);
+
+    private NioEchoServer server;
+    private Selector selector;
+    private ChannelBuilder channelBuilder;
+    private CertStores serverCertStores;
+    private CertStores clientCertStores;
+    private Map<String, Object> saslClientConfigs;
+    private Map<String, Object> saslServerConfigs;
+    private CredentialCache credentialCache;
+    private long startTimeMs;
+    private final int failedAuthenticationDelayMs;
+
+    public SaslAuthenticatorFailureDelayTest(int failedAuthenticationDelayMs) {
+        this.failedAuthenticationDelayMs = failedAuthenticationDelayMs;
+    }
+
+    @Parameterized.Parameters(name = "failedAuthenticationDelayMs={0}")
+    public static Collection<Object[]> data() {
+        List<Object[]> values = new ArrayList<>();
+        values.add(new Object[]{0});
+        values.add(new Object[]{200});
+        return values;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        LoginManager.closeAll();
+        serverCertStores = new CertStores(true, "localhost");
+        clientCertStores = new CertStores(false, "localhost");
+        saslServerConfigs = 
serverCertStores.getTrustingConfig(clientCertStores);
+        saslClientConfigs = 
clientCertStores.getTrustingConfig(serverCertStores);
+        credentialCache = new CredentialCache();
+        SaslAuthenticatorTest.TestLogin.loginCount.set(0);
+        startTimeMs = time.milliseconds();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        long now = time.milliseconds();
+        if (server != null)
+            this.server.close();
+        if (selector != null)
+            this.selector.close();
+        if (failedAuthenticationDelayMs != -1)
+            assertTrue("timeSpent: " + (now - startTimeMs), now - startTimeMs 
>= failedAuthenticationDelayMs);
+    }
+
+    /**
+     * Tests that SASL/PLAIN clients with invalid password fail authentication.
+     */
+    @Test
+    public void testInvalidPasswordSaslPlain() throws Exception {
+        String node = "0";
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        TestJaasConfig jaasConfig = configureMechanisms("PLAIN", 
Arrays.asList("PLAIN"));
+        jaasConfig.setClientOptions("PLAIN", TestJaasConfig.USERNAME, 
"invalidpassword");
+
+        server = createEchoServer(securityProtocol);
+        createAndCheckClientAuthenticationFailure(securityProtocol, node, 
"PLAIN",
+                "Authentication failed: Invalid username or password");
+        server.verifyAuthenticationMetrics(0, 1);
+    }
+
+    /**
+     * Tests client connection close before response for authentication 
failure is sent.
+     */
+    @Test
+    public void testClientConnectionClose() throws Exception {
+        String node = "0";
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        TestJaasConfig jaasConfig = configureMechanisms("PLAIN", 
Arrays.asList("PLAIN"));
+        jaasConfig.setClientOptions("PLAIN", TestJaasConfig.USERNAME, 
"invalidpassword");
+
+        server = createEchoServer(securityProtocol);
+        createClientConnection(securityProtocol, node);
+
+        Map<?, ?> delayedClosingChannels = 
NetworkTestUtils.delayedClosingChannels(server.selector());
+
+        // Wait until server has established connection with client and has 
processed the auth failure
+        TestUtils.waitForCondition(() -> {
+            poll(selector);
+            return !server.selector().channels().isEmpty();
+        }, "Timeout waiting for connection");
+        TestUtils.waitForCondition(() -> {
+            poll(selector);
+            return failedAuthenticationDelayMs == 0 || 
!delayedClosingChannels.isEmpty();
+        }, "Timeout waiting for auth failure");
+
+        selector.close();
+        selector = null;
+
+        // Now that client connection is closed, wait until server notices the 
disconnection and removes it from the
+        // list of connected channels and from delayed response for auth 
failure
+        TestUtils.waitForCondition(() -> failedAuthenticationDelayMs == 0 || 
delayedClosingChannels.isEmpty(),
+                "Timeout waiting for delayed response remove");
+        TestUtils.waitForCondition(() -> 
server.selector().channels().isEmpty(),
+                "Timeout waiting for connection close");
+
+        // Try forcing completion of delayed channel close
+        TestUtils.waitForCondition(() -> time.milliseconds() > startTimeMs + 
failedAuthenticationDelayMs + 1,
+                "Timeout when waiting for auth failure response timeout to 
elapse");
+        NetworkTestUtils.completeDelayedChannelClose(server.selector(), 
time.nanoseconds());
+    }
+
+    private void poll(Selector selector) {
+        try {
+            selector.poll(50);
+        } catch (IOException e) {
+            Assert.fail("Caught unexpected exception " + e);
+        }
+    }
+
+    private TestJaasConfig configureMechanisms(String clientMechanism, 
List<String> serverMechanisms) {
+        saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, clientMechanism);
+        
saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
serverMechanisms);
+        if (serverMechanisms.contains("DIGEST-MD5")) {
+            saslServerConfigs.put("digest-md5." + 
BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS,
+                    
TestDigestLoginModule.DigestServerCallbackHandler.class.getName());
+        }
+        return TestJaasConfig.createConfiguration(clientMechanism, 
serverMechanisms);
+    }
+
+    private void createSelector(SecurityProtocol securityProtocol, Map<String, 
Object> clientConfigs) {
+        if (selector != null) {
+            selector.close();
+            selector = null;
+        }
+
+        String saslMechanism = (String) 
saslClientConfigs.get(SaslConfigs.SASL_MECHANISM);
+        this.channelBuilder = 
ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT,
+                new TestSecurityConfig(clientConfigs), null, saslMechanism, 
true);
+        this.selector = NetworkTestUtils.createSelector(channelBuilder, time);
+    }
+
+    private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) 
throws Exception {
+        return 
createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), 
securityProtocol);
+    }
+
+    private NioEchoServer createEchoServer(ListenerName listenerName, 
SecurityProtocol securityProtocol) throws Exception {
+        if (failedAuthenticationDelayMs != -1)
+            return NetworkTestUtils.createEchoServer(listenerName, 
securityProtocol,
+                    new TestSecurityConfig(saslServerConfigs), 
credentialCache, failedAuthenticationDelayMs, time);
+        else
+            return NetworkTestUtils.createEchoServer(listenerName, 
securityProtocol,
+                    new TestSecurityConfig(saslServerConfigs), 
credentialCache, time);
+    }
+
+    private void createClientConnection(SecurityProtocol securityProtocol, 
String node) throws Exception {
+        createSelector(securityProtocol, saslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 
server.port());
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+    }
+
+    private void createAndCheckClientAuthenticationFailure(SecurityProtocol 
securityProtocol, String node,
+                                                           String mechanism, 
String expectedErrorMessage) throws Exception {
+        ChannelState finalState = 
createAndCheckClientConnectionFailure(securityProtocol, node);
+        Exception exception = finalState.exception();
+        assertTrue("Invalid exception class " + exception.getClass(), 
exception instanceof SaslAuthenticationException);
+        if (expectedErrorMessage == null)
+            expectedErrorMessage = "Authentication failed due to invalid 
credentials with SASL mechanism " + mechanism;
+        assertEquals(expectedErrorMessage, exception.getMessage());
+    }
+
+    private ChannelState 
createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String 
node)
+            throws Exception {
+        createClientConnection(securityProtocol, node);
+        ChannelState finalState = 
NetworkTestUtils.waitForChannelClose(selector, node,
+                ChannelState.State.AUTHENTICATION_FAILED, time);
+        selector.close();
+        selector = null;
+        return finalState;
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index b8894f1..74058eb 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -92,6 +92,7 @@ import 
org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
 import 
org.apache.kafka.common.security.authenticator.TestDigestLoginModule.DigestServerCallbackHandler;
 import 
org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler;
 
+import org.apache.kafka.common.utils.Time;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -107,6 +108,7 @@ import static org.junit.Assert.fail;
 public class SaslAuthenticatorTest {
 
     private static final int BUFFER_SIZE = 4 * 1024;
+    private static Time time = Time.SYSTEM;
 
     private NioEchoServer server;
     private Selector selector;
@@ -1308,7 +1310,7 @@ public class SaslAuthenticatorTest {
         };
         serverChannelBuilder.configure(saslServerConfigs);
         server = new NioEchoServer(listenerName, securityProtocol, new 
TestSecurityConfig(saslServerConfigs),
-                "localhost", serverChannelBuilder, credentialCache);
+                "localhost", serverChannelBuilder, credentialCache, time);
         server.start();
         return server;
     }
@@ -1347,7 +1349,7 @@ public class SaslAuthenticatorTest {
             }
         };
         clientChannelBuilder.configure(saslClientConfigs);
-        this.selector = NetworkTestUtils.createSelector(clientChannelBuilder);
+        this.selector = NetworkTestUtils.createSelector(clientChannelBuilder, 
time);
         InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
     }
@@ -1452,7 +1454,7 @@ public class SaslAuthenticatorTest {
         String saslMechanism = (String) 
saslClientConfigs.get(SaslConfigs.SASL_MECHANISM);
         this.channelBuilder = 
ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT,
                 new TestSecurityConfig(clientConfigs), null, saslMechanism, 
true);
-        this.selector = NetworkTestUtils.createSelector(channelBuilder);
+        this.selector = NetworkTestUtils.createSelector(channelBuilder, time);
     }
 
     private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) 
throws Exception {
@@ -1461,7 +1463,7 @@ public class SaslAuthenticatorTest {
 
     private NioEchoServer createEchoServer(ListenerName listenerName, 
SecurityProtocol securityProtocol) throws Exception {
         return NetworkTestUtils.createEchoServer(listenerName, 
securityProtocol,
-                new TestSecurityConfig(saslServerConfigs), credentialCache);
+                new TestSecurityConfig(saslServerConfigs), credentialCache, 
time);
     }
 
     private void createClientConnection(SecurityProtocol securityProtocol, 
String node) throws Exception {
@@ -1490,8 +1492,7 @@ public class SaslAuthenticatorTest {
     private ChannelState 
createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String 
node)
             throws Exception {
         createClientConnection(securityProtocol, node);
-        ChannelState finalState = 
NetworkTestUtils.waitForChannelClose(selector, node,
-                ChannelState.State.AUTHENTICATION_FAILED);
+        ChannelState finalState = 
NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.State.AUTHENTICATION_FAILED);
         selector.close();
         selector = null;
         return finalState;
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 96feee8..1365f90 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -245,6 +245,7 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
       requestChannel,
       connectionQuotas,
       config.connectionsMaxIdleMs,
+      config.failedAuthenticationDelayMs,
       listenerName,
       securityProtocol,
       config,
@@ -502,6 +503,7 @@ private[kafka] class Processor(val id: Int,
                                requestChannel: RequestChannel,
                                connectionQuotas: ConnectionQuotas,
                                connectionsMaxIdleMs: Long,
+                               failedAuthenticationDelayMs: Int,
                                listenerName: ListenerName,
                                securityProtocol: SecurityProtocol,
                                config: KafkaConfig,
@@ -563,6 +565,7 @@ private[kafka] class Processor(val id: Int,
     new KSelector(
       maxRequestSize,
       connectionsMaxIdleMs,
+      failedAuthenticationDelayMs,
       metrics,
       time,
       "socket-server",
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 334d496..753a5b9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -77,6 +77,7 @@ object Defaults {
   val MaxConnectionsPerIpOverrides: String = ""
   val ConnectionsMaxIdleMs = 10 * 60 * 1000L
   val RequestTimeoutMs = 30000
+  val FailedAuthenticationDelayMs = 100
 
   /** ********* Log Configuration ***********/
   val NumPartitions = 1
@@ -282,6 +283,7 @@ object KafkaConfig {
   val MaxConnectionsPerIpProp = "max.connections.per.ip"
   val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides"
   val ConnectionsMaxIdleMsProp = "connections.max.idle.ms"
+  val FailedAuthenticationDelayMsProp = 
"connection.failed.authentication.delay.ms"
   /***************** rack configuration *************/
   val RackProp = "broker.rack"
   /** ********* Log Configuration ***********/
@@ -537,6 +539,8 @@ object KafkaConfig {
     "configured using " + MaxConnectionsPerIpOverridesProp + " property"
   val MaxConnectionsPerIpOverridesDoc = "A comma-separated list of per-ip or 
hostname overrides to the default maximum number of connections. An example 
value is \"hostName:100,127.0.0.1:200\""
   val ConnectionsMaxIdleMsDoc = "Idle connections timeout: the server socket 
processor threads close the connections that idle more than this"
+  val FailedAuthenticationDelayMsDoc = "Connection close delay on failed 
authentication: this is the time (in milliseconds) by which connection close 
will be delayed on authentication failure. " +
+    s"This must be configured to be less than $ConnectionsMaxIdleMsProp to 
prevent connection timeout."
   /************* Rack Configuration **************/
   val RackDoc = "Rack of the broker. This will be used in rack aware 
replication assignment for fault tolerance. Examples: `RACK1`, `us-east-1d`"
   /** ********* Log Configuration ***********/
@@ -820,6 +824,7 @@ object KafkaConfig {
       .define(MaxConnectionsPerIpProp, INT, Defaults.MaxConnectionsPerIp, 
atLeast(0), MEDIUM, MaxConnectionsPerIpDoc)
       .define(MaxConnectionsPerIpOverridesProp, STRING, 
Defaults.MaxConnectionsPerIpOverrides, MEDIUM, MaxConnectionsPerIpOverridesDoc)
       .define(ConnectionsMaxIdleMsProp, LONG, Defaults.ConnectionsMaxIdleMs, 
MEDIUM, ConnectionsMaxIdleMsDoc)
+      .define(FailedAuthenticationDelayMsProp, INT, 
Defaults.FailedAuthenticationDelayMs, atLeast(0), LOW, 
FailedAuthenticationDelayMsDoc)
 
       /************ Rack Configuration ******************/
       .define(RackProp, STRING, null, MEDIUM, RackDoc)
@@ -1101,6 +1106,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   val maxConnectionsPerIpOverrides: Map[String, Int] =
     getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, 
getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => 
(k, v.toInt)}
   val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp)
+  val failedAuthenticationDelayMs = 
getInt(KafkaConfig.FailedAuthenticationDelayMsProp)
 
   /***************** rack configuration **************/
   val rack = Option(getString(KafkaConfig.RackProp))
@@ -1397,5 +1403,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
     val invalidAddresses = maxConnectionsPerIpOverrides.keys.filterNot(address 
=> Utils.validHostPattern(address))
     if (!invalidAddresses.isEmpty)
       throw new 
IllegalArgumentException(s"${KafkaConfig.MaxConnectionsPerIpOverridesProp} 
contains invalid addresses : ${invalidAddresses.mkString(",")}")
+
+    if (connectionsMaxIdleMs >= 0)
+      require(failedAuthenticationDelayMs < connectionsMaxIdleMs,
+        
s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs 
should always be less than" +
+          s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to 
prevent failed" +
+          " authentication responses from timing out")
   }
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
index a9b4a60..a630293 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
@@ -24,10 +24,10 @@ import scala.collection.immutable.List
 class SaslGssapiSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTest {
   override val clientPrincipal = 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName
   override val kafkaPrincipal = 
JaasTestUtils.KafkaServerPrincipalUnqualifiedName
-  
+
   override protected def kafkaClientSaslMechanism = "GSSAPI"
   override protected def kafkaServerSaslMechanisms = List("GSSAPI")
-  
+
   // Configure brokers to require SSL client authentication in order to verify 
that SASL_SSL works correctly even if the
   // client doesn't have a keystore. We want to cover the scenario where a 
broker requires either SSL client
   // authentication or SASL authentication with SSL as the transport layer 
(but not both).
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala 
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 3913212..81de105 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -138,8 +138,12 @@ trait SaslSetup {
     props
   }
 
-  def jaasClientLoginModule(clientSaslMechanism: String): String =
-    JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile)
+  def jaasClientLoginModule(clientSaslMechanism: String, serviceName: 
Option[String] = None): String = {
+    if (serviceName.isDefined)
+      JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile, 
serviceName.get)
+    else
+      JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile)
+  }
 
   def createScramCredentials(zkConnect: String, userName: String, password: 
String): Unit = {
     val credentials = ScramMechanism.values.map(m => 
s"${m.mechanismName}=[iterations=4096,password=$password]")
diff --git 
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala 
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 74b2a15..cbe8462 100644
--- 
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -19,19 +19,25 @@
 package kafka.server
 
 import java.net.InetSocketAddress
+import java.time.Duration
 import java.util.Properties
 import java.util.concurrent.{Executors, TimeUnit}
 
 import kafka.api.{Both, IntegrationTestHarness, SaslSetup}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.errors.SaslAuthenticationException
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.security.{JaasContext, TestSecurityConfig}
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.MockTime
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
+import scala.collection.JavaConverters._
+
 class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
   override val serverCount = 1
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@@ -43,10 +49,17 @@ class GssapiAuthenticationTest extends 
IntegrationTestHarness with SaslSetup {
   private val executor = Executors.newFixedThreadPool(numThreads)
   private val clientConfig: Properties = new Properties
   private var serverAddr: InetSocketAddress = _
+  private val time = new MockTime(10)
+  val topic = "topic"
+  val part = 0
+  val tp = new TopicPartition(topic, part)
+  private val failedAuthenticationDelayMs = 2000
 
   @Before
   override def setUp() {
     startSasl(jaasSections(kafkaServerSaslMechanisms, 
Option(kafkaClientSaslMechanism), Both))
+    serverConfig.put(KafkaConfig.SslClientAuthProp, "required")
+    serverConfig.put(KafkaConfig.FailedAuthenticationDelayMsProp, 
failedAuthenticationDelayMs.toString)
     super.setUp()
     serverAddr = new InetSocketAddress("localhost",
       
servers.head.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)))
@@ -55,6 +68,9 @@ class GssapiAuthenticationTest extends IntegrationTestHarness 
with SaslSetup {
     clientConfig.put(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism)
     clientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, 
jaasClientLoginModule(kafkaClientSaslMechanism))
     clientConfig.put(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, 
"5000")
+
+    // create the test topic with all the brokers as replicas
+    createTopic(topic, 2, serverCount)
   }
 
   @After
@@ -94,6 +110,31 @@ class GssapiAuthenticationTest extends 
IntegrationTestHarness with SaslSetup {
   }
 
   /**
+   * Test that when client fails to verify authenticity of the server, the 
resulting failed authentication exception
+   * is thrown immediately, and is not affected by 
<code>connection.failed.authentication.delay.ms</code>.
+   */
+  @Test
+  def testServerAuthenticationFailure(): Unit = {
+    // Setup client with a non-existent service principal, so that server 
authentication fails on the client
+    val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism, 
Some("another-kafka-service"))
+    val configOverrides = new Properties()
+    configOverrides.setProperty(SaslConfigs.SASL_JAAS_CONFIG, 
clientLoginContext)
+    val consumer = createConsumer(configOverrides = configOverrides)
+    consumer.assign(List(tp).asJava)
+
+    val startMs = System.currentTimeMillis()
+    try {
+      consumer.poll(Duration.ofMillis(50))
+      fail()
+    } catch {
+      case _: SaslAuthenticationException =>
+    }
+    val endMs = System.currentTimeMillis()
+    require(endMs - startMs < failedAuthenticationDelayMs, "Failed 
authentication must not be delayed on the client")
+    consumer.close()
+  }
+
+  /**
    * Verifies that any exceptions during authentication with the current 
`clientConfig` are
    * notified with disconnect state `AUTHENTICATE` (and not 
`AUTHENTICATION_FAILED`). This
    * is to ensure that NetworkClient doesn't handle this as a fatal 
authentication failure,
@@ -148,6 +189,6 @@ class GssapiAuthenticationTest extends 
IntegrationTestHarness with SaslSetup {
   private def createSelector(): Selector = {
     val channelBuilder = ChannelBuilders.clientChannelBuilder(securityProtocol,
       JaasContext.Type.CLIENT, new TestSecurityConfig(clientConfig), null, 
kafkaClientSaslMechanism, true)
-    NetworkTestUtils.createSelector(channelBuilder)
+    NetworkTestUtils.createSelector(channelBuilder, time)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index da61149..b598337 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -307,13 +307,14 @@ class SocketServerTest extends JUnitSuite {
       override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, 
listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: 
MemoryPool): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, requestChannel, 
connectionQuotas,
-          config.connectionsMaxIdleMs, listenerName, protocol, config, 
metrics, credentialProvider, memoryPool, new LogContext()) {
-          override protected[network] def connectionId(socket: Socket): String 
= overrideConnectionId
-          override protected[network] def createSelector(channelBuilder: 
ChannelBuilder): Selector = {
-           val testableSelector = new TestableSelector(config, channelBuilder, 
time, metrics)
-           selector = testableSelector
-           testableSelector
-        }
+          config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, 
listenerName, protocol, config, metrics,
+          credentialProvider, memoryPool, new LogContext()) {
+            override protected[network] def connectionId(socket: Socket): 
String = overrideConnectionId
+            override protected[network] def createSelector(channelBuilder: 
ChannelBuilder): Selector = {
+             val testableSelector = new TestableSelector(config, 
channelBuilder, time, metrics)
+             selector = testableSelector
+             testableSelector
+          }
         }
       }
     }
@@ -652,7 +653,8 @@ class SocketServerTest extends JUnitSuite {
       override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, 
listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: 
MemoryPool): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, requestChannel, 
connectionQuotas,
-          config.connectionsMaxIdleMs, listenerName, protocol, config, 
metrics, credentialProvider, MemoryPool.NONE, new LogContext()) {
+          config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, 
listenerName, protocol, config, metrics,
+          credentialProvider, MemoryPool.NONE, new LogContext()) {
           override protected[network] def sendResponse(response: 
RequestChannel.Response, responseSend: Send) {
             conn.close()
             super.sendResponse(response, responseSend)
@@ -697,7 +699,8 @@ class SocketServerTest extends JUnitSuite {
       override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, 
listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: 
MemoryPool): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, requestChannel, 
connectionQuotas,
-          config.connectionsMaxIdleMs, listenerName, protocol, config, 
metrics, credentialProvider, memoryPool, new LogContext()) {
+          config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, 
listenerName, protocol, config, metrics,
+          credentialProvider, memoryPool, new LogContext()) {
           override protected[network] def connectionId(socket: Socket): String 
= overrideConnectionId
           override protected[network] def createSelector(channelBuilder: 
ChannelBuilder): Selector = {
            val testableSelector = new TestableSelector(config, channelBuilder, 
time, metrics)
@@ -743,7 +746,7 @@ class SocketServerTest extends JUnitSuite {
   @Test
   def testBrokerSendAfterChannelClosedUpdatesRequestMetrics() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 0)
-    props.setProperty(KafkaConfig.ConnectionsMaxIdleMsProp, "100")
+    props.setProperty(KafkaConfig.ConnectionsMaxIdleMsProp, "110")
     val serverMetrics = new Metrics
     var conn: Socket = null
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), 
serverMetrics, Time.SYSTEM, credentialProvider)
@@ -1100,10 +1103,8 @@ class SocketServerTest extends JUnitSuite {
 
     override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, 
listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: 
MemoryPool): Processor = {
-      new Processor(id, time, config.socketRequestMaxBytes, requestChannel, 
connectionQuotas,
-        config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, 
credentialProvider, memoryPool, new
-            LogContext()) {
-
+      new Processor(id, time, config.socketRequestMaxBytes, requestChannel, 
connectionQuotas, config.connectionsMaxIdleMs,
+        config.failedAuthenticationDelayMs, listenerName, protocol, config, 
metrics, credentialProvider, memoryPool, new LogContext()) {
         override protected[network] def createSelector(channelBuilder: 
ChannelBuilder): Selector = {
            val testableSelector = new TestableSelector(config, channelBuilder, 
time, metrics)
            assertEquals(None, selector)
@@ -1149,7 +1150,7 @@ class SocketServerTest extends JUnitSuite {
   }
 
   class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder, 
time: Time, metrics: Metrics)
-        extends Selector(config.socketRequestMaxBytes, 
config.connectionsMaxIdleMs,
+        extends Selector(config.socketRequestMaxBytes, 
config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs,
             metrics, time, "socket-server", new HashMap, false, true, 
channelBuilder, MemoryPool.NONE, new LogContext()) {
 
     val failures = mutable.Map[SelectorOperation, Exception]()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 927dd1c..b7a8951 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -587,6 +587,7 @@ class KafkaConfigTest {
         case KafkaConfig.MaxConnectionsPerIpOverridesProp =>
           assertPropertyInvalid(getBaseProperties(), name, 
"127.0.0.1:not_a_number")
         case KafkaConfig.ConnectionsMaxIdleMsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.FailedAuthenticationDelayMsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
 
         case KafkaConfig.NumPartitionsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.LogDirsProp => // ignore string
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index e8d9c30..1870a49 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -170,8 +170,8 @@ object JaasTestUtils {
   }
 
   // Returns the dynamic configuration, using credentials for user #1
-  def clientLoginModule(mechanism: String, keytabLocation: Option[File]): 
String =
-    kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, 
KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword, 
KafkaOAuthBearerUser).toString
+  def clientLoginModule(mechanism: String, keytabLocation: Option[File], 
serviceName: String = serviceName): String =
+    kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, 
KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword, 
KafkaOAuthBearerUser, serviceName).toString
 
   def tokenClientLoginModule(tokenId: String, password: String): String = {
     ScramLoginModule(
@@ -223,10 +223,11 @@ object JaasTestUtils {
   }
 
   // consider refactoring if more mechanisms are added
-  private def kafkaClientModule(mechanism: String, 
+  private def kafkaClientModule(mechanism: String,
       keytabLocation: Option[File], clientPrincipal: String,
       plainUser: String, plainPassword: String, 
-      scramUser: String, scramPassword: String, oauthBearerUser: String): 
JaasModule = {
+      scramUser: String, scramPassword: String,
+      oauthBearerUser: String, serviceName: String = serviceName): JaasModule 
= {
     mechanism match {
       case "GSSAPI" =>
         Krb5LoginModule(

Reply via email to