This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5d1bfa0 KAFKA-6950: Delay response to failed client authentication to
prevent potential DoS issues (KIP-306) (#5082)
5d1bfa0 is described below
commit 5d1bfa0665ce0850ee76cee152e397c23ac329a7
Author: Dhruvil Shah <[email protected]>
AuthorDate: Fri Aug 31 04:04:33 2018 -0700
KAFKA-6950: Delay response to failed client authentication to prevent
potential DoS issues (KIP-306) (#5082)
Reviewers: Ron Dagostino <[email protected]>, Ismael Juma
<[email protected]>, Rajini Sivaram <[email protected]>
---
.../common/errors/AuthenticationException.java | 4 +
.../apache/kafka/common/network/Authenticator.java | 9 +-
.../DelayedResponseAuthenticationException.java | 27 +++
.../apache/kafka/common/network/KafkaChannel.java | 27 ++-
.../org/apache/kafka/common/network/Selector.java | 142 ++++++++++++-
.../authenticator/SaslServerAuthenticator.java | 31 ++-
.../kafka/common/network/NetworkTestUtils.java | 36 +++-
.../apache/kafka/common/network/NioEchoServer.java | 12 +-
.../common/network/SslTransportLayerTest.java | 20 +-
.../ClientAuthenticationFailureTest.java | 4 +-
.../SaslAuthenticatorFailureDelayTest.java | 229 +++++++++++++++++++++
.../authenticator/SaslAuthenticatorTest.java | 13 +-
.../main/scala/kafka/network/SocketServer.scala | 3 +
core/src/main/scala/kafka/server/KafkaConfig.scala | 12 ++
.../SaslGssapiSslEndToEndAuthorizationTest.scala | 4 +-
.../scala/integration/kafka/api/SaslSetup.scala | 8 +-
.../kafka/server/GssapiAuthenticationTest.scala | 43 +++-
.../unit/kafka/network/SocketServerTest.scala | 31 +--
.../scala/unit/kafka/server/KafkaConfigTest.scala | 1 +
.../scala/unit/kafka/utils/JaasTestUtils.scala | 9 +-
20 files changed, 603 insertions(+), 62 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
index f6458c6..7a05eba 100644
---
a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
+++
b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
@@ -40,6 +40,10 @@ public class AuthenticationException extends ApiException {
super(message);
}
+ public AuthenticationException(Throwable cause) {
+ super(cause);
+ }
+
public AuthenticationException(String message, Throwable cause) {
super(message, cause);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
index 4e2e727..33c2e90 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
@@ -38,6 +38,14 @@ public interface Authenticator extends Closeable {
void authenticate() throws AuthenticationException, IOException;
/**
+ * Perform any processing related to authentication failure. This is
invoked when the channel is about to be closed
+ * because of an {@link AuthenticationException} thrown from a prior
{@link #authenticate()} call.
+ * @throws IOException if read/write fails due to an I/O error
+ */
+ default void handleAuthenticationFailure() throws IOException {
+ }
+
+ /**
* Returns Principal using PrincipalBuilder
*/
KafkaPrincipal principal();
@@ -46,5 +54,4 @@ public interface Authenticator extends Closeable {
* returns true if authentication is complete otherwise returns false;
*/
boolean complete();
-
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/DelayedResponseAuthenticationException.java
b/clients/src/main/java/org/apache/kafka/common/network/DelayedResponseAuthenticationException.java
new file mode 100644
index 0000000..8474426
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/network/DelayedResponseAuthenticationException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.errors.AuthenticationException;
+
+public class DelayedResponseAuthenticationException extends
AuthenticationException {
+ private static final long serialVersionUID = 1L;
+
+ public DelayedResponseAuthenticationException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 1839729..17dc6a3 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -120,15 +120,22 @@ public class KafkaChannel {
* authentication. For SASL, authentication is performed by {@link
Authenticator#authenticate()}.
*/
public void prepare() throws AuthenticationException, IOException {
+ boolean authenticating = false;
try {
if (!transportLayer.ready())
transportLayer.handshake();
- if (transportLayer.ready() && !authenticator.complete())
+ if (transportLayer.ready() && !authenticator.complete()) {
+ authenticating = true;
authenticator.authenticate();
+ }
} catch (AuthenticationException e) {
// Clients are notified of authentication exceptions to enable
operations to be terminated
// without retries. Other errors are handled as network exceptions
in Selector.
state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED,
e);
+ if (authenticating) {
+ delayCloseOnAuthenticationFailure();
+ throw new DelayedResponseAuthenticationException(e);
+ }
throw e;
}
if (ready())
@@ -237,6 +244,24 @@ public class KafkaChannel {
}
/**
+ * Delay channel close on authentication failure. This will remove all
read/write operations from the channel until
+ * {@link #completeCloseOnAuthenticationFailure()} is called to finish up
the channel close.
+ */
+ private void delayCloseOnAuthenticationFailure() {
+ transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
+ }
+
+ /**
+ * Finish up any processing on {@link #prepare()} failure.
+ * @throws IOException
+ */
+ void completeCloseOnAuthenticationFailure() throws IOException {
+ transportLayer.addInterestOps(SelectionKey.OP_WRITE);
+ // Invoke the underlying handler to finish up any processing on
authentication failure
+ authenticator.handleAuthenticationFailure();
+ }
+
+ /**
* Returns true if this channel has been explicitly muted using {@link
KafkaChannel#mute()}
*/
public boolean isMute() {
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 7e32509..806bda7 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -86,6 +86,7 @@ import java.util.concurrent.TimeUnit;
public class Selector implements Selectable, AutoCloseable {
public static final long NO_IDLE_TIMEOUT_MS = -1;
+ public static final int NO_FAILED_AUTHENTICATION_DELAY = 0;
private enum CloseMode {
GRACEFUL(true), // process outstanding staged receives,
notify disconnect
@@ -119,8 +120,11 @@ public class Selector implements Selectable, AutoCloseable
{
private final int maxReceiveSize;
private final boolean recordTimePerConnection;
private final IdleExpiryManager idleExpiryManager;
+ private final LinkedHashMap<String, DelayedAuthenticationFailureClose>
delayedClosingChannels;
private final MemoryPool memoryPool;
private final long lowMemThreshold;
+ private final int failedAuthenticationDelayMs;
+
//indicates if the previous call to poll was able to make progress in
reading already-buffered data.
//this is used to prevent tight loops when memory is not available to read
any more data
private boolean madeReadProgressLastPoll = true;
@@ -129,6 +133,8 @@ public class Selector implements Selectable, AutoCloseable {
* Create a new nioSelector
* @param maxReceiveSize Max size in bytes of a single network receive
(use {@link NetworkReceive#UNLIMITED} for no limit)
* @param connectionMaxIdleMs Max idle connection time (use {@link
#NO_IDLE_TIMEOUT_MS} to disable idle timeout)
+ * @param failedAuthenticationDelayMs Minimum time by which failed
authentication response and channel close should be delayed by.
+ * Use {@link
#NO_FAILED_AUTHENTICATION_DELAY} to disable this delay.
* @param metrics Registry for Selector metrics
* @param time Time implementation
* @param metricGrpPrefix Prefix for the group of metrics registered by
Selector
@@ -139,6 +145,7 @@ public class Selector implements Selectable, AutoCloseable {
*/
public Selector(int maxReceiveSize,
long connectionMaxIdleMs,
+ int failedAuthenticationDelayMs,
Metrics metrics,
Time time,
String metricGrpPrefix,
@@ -174,9 +181,40 @@ public class Selector implements Selectable, AutoCloseable
{
this.memoryPool = memoryPool;
this.lowMemThreshold = (long) (0.1 * this.memoryPool.size());
this.log = logContext.logger(Selector.class);
+ this.failedAuthenticationDelayMs = failedAuthenticationDelayMs;
+ this.delayedClosingChannels = (failedAuthenticationDelayMs >
NO_FAILED_AUTHENTICATION_DELAY) ? new LinkedHashMap<String,
DelayedAuthenticationFailureClose>() : null;
+ }
+
+ public Selector(int maxReceiveSize,
+ long connectionMaxIdleMs,
+ Metrics metrics,
+ Time time,
+ String metricGrpPrefix,
+ Map<String, String> metricTags,
+ boolean metricsPerConnection,
+ boolean recordTimePerConnection,
+ ChannelBuilder channelBuilder,
+ MemoryPool memoryPool,
+ LogContext logContext) {
+ this(maxReceiveSize, connectionMaxIdleMs,
NO_FAILED_AUTHENTICATION_DELAY, metrics, time, metricGrpPrefix, metricTags,
+ metricsPerConnection, recordTimePerConnection, channelBuilder,
memoryPool, logContext);
}
public Selector(int maxReceiveSize,
+ long connectionMaxIdleMs,
+ int failedAuthenticationDelayMs,
+ Metrics metrics,
+ Time time,
+ String metricGrpPrefix,
+ Map<String, String> metricTags,
+ boolean metricsPerConnection,
+ ChannelBuilder channelBuilder,
+ LogContext logContext) {
+ this(maxReceiveSize, connectionMaxIdleMs, failedAuthenticationDelayMs,
metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false,
channelBuilder, MemoryPool.NONE, logContext);
+ }
+
+
+ public Selector(int maxReceiveSize,
long connectionMaxIdleMs,
Metrics metrics,
Time time,
@@ -185,13 +223,17 @@ public class Selector implements Selectable,
AutoCloseable {
boolean metricsPerConnection,
ChannelBuilder channelBuilder,
LogContext logContext) {
- this(maxReceiveSize, connectionMaxIdleMs, metrics, time,
metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder,
MemoryPool.NONE, logContext);
+ this(maxReceiveSize, connectionMaxIdleMs,
NO_FAILED_AUTHENTICATION_DELAY, metrics, time, metricGrpPrefix, metricTags,
metricsPerConnection, channelBuilder, logContext);
}
public Selector(long connectionMaxIdleMS, Metrics metrics, Time time,
String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) {
this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time,
metricGrpPrefix, Collections.<String, String>emptyMap(), true, channelBuilder,
logContext);
}
+ public Selector(long connectionMaxIdleMS, int failedAuthenticationDelayMs,
Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder
channelBuilder, LogContext logContext) {
+ this(NetworkReceive.UNLIMITED, connectionMaxIdleMS,
failedAuthenticationDelayMs, metrics, time, metricGrpPrefix,
Collections.<String, String>emptyMap(), true, channelBuilder, logContext);
+ }
+
/**
* Begin connecting to the given address and add the connection to this
nioSelector associated with the given id
* number.
@@ -435,6 +477,9 @@ public class Selector implements Selectable, AutoCloseable {
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
+ // Close channels that were delayed and are now ready to be closed
+ completeDelayedChannelClose(endIo);
+
// we use the time at the end of select to ensure that we don't close
any connections that
// have just been processed in pollSelectionKeys
maybeCloseOldestConnection(endSelect);
@@ -457,15 +502,14 @@ public class Selector implements Selectable,
AutoCloseable {
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
KafkaChannel channel = channel(key);
long channelStartTimeNanos = recordTimePerConnection ?
time.nanoseconds() : 0;
+ boolean sendFailed = false;
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);
- boolean sendFailed = false;
try {
-
/* complete any connections that have finished their handshake
(either normally or immediately) */
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {
@@ -477,8 +521,9 @@ public class Selector implements Selectable, AutoCloseable {
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
channel.id());
- } else
+ } else {
continue;
+ }
}
/* if channel is not ready finish prepare */
@@ -532,7 +577,11 @@ public class Selector implements Selectable, AutoCloseable
{
log.debug("Connection with {} disconnected due to
authentication exception", desc, e);
else
log.warn("Unexpected error from {}; closing connection",
desc, e);
- close(channel, sendFailed ? CloseMode.NOTIFY_ONLY :
CloseMode.GRACEFUL);
+
+ if (e instanceof DelayedResponseAuthenticationException)
+ maybeDelayCloseOnAuthenticationFailure(channel);
+ else
+ close(channel, sendFailed ? CloseMode.NOTIFY_ONLY :
CloseMode.GRACEFUL);
} finally {
maybeRecordTimePerConnection(channel, channelStartTimeNanos);
}
@@ -631,6 +680,18 @@ public class Selector implements Selectable, AutoCloseable
{
unmute(channel);
}
+ // package-private for testing
+ void completeDelayedChannelClose(long currentTimeNanos) {
+ if (delayedClosingChannels == null)
+ return;
+
+ while (!delayedClosingChannels.isEmpty()) {
+ DelayedAuthenticationFailureClose delayedClose =
delayedClosingChannels.values().iterator().next();
+ if (!delayedClose.tryClose(currentTimeNanos))
+ break;
+ }
+ }
+
private void maybeCloseOldestConnection(long currentTimeNanos) {
if (idleExpiryManager == null)
return;
@@ -657,6 +718,7 @@ public class Selector implements Selectable, AutoCloseable {
this.completedReceives.clear();
this.connected.clear();
this.disconnected.clear();
+
// Remove closed channels after all their staged receives have been
processed or if a send was requested
for (Iterator<Map.Entry<String, KafkaChannel>> it =
closingChannels.entrySet().iterator(); it.hasNext(); ) {
KafkaChannel channel = it.next().getValue();
@@ -667,6 +729,7 @@ public class Selector implements Selectable, AutoCloseable {
it.remove();
}
}
+
for (String channel : this.failedSends)
this.disconnected.put(channel, ChannelState.FAILED_SEND);
this.failedSends.clear();
@@ -707,6 +770,24 @@ public class Selector implements Selectable, AutoCloseable
{
}
}
+ private void maybeDelayCloseOnAuthenticationFailure(KafkaChannel channel) {
+ DelayedAuthenticationFailureClose delayedClose = new
DelayedAuthenticationFailureClose(channel, failedAuthenticationDelayMs);
+ if (delayedClosingChannels != null)
+ delayedClosingChannels.put(channel.id(), delayedClose);
+ else
+ delayedClose.closeNow();
+ }
+
+ private void handleCloseOnAuthenticationFailure(KafkaChannel channel) {
+ try {
+ channel.completeCloseOnAuthenticationFailure();
+ } catch (Exception e) {
+ log.error("Exception handling close on authentication failure node
{}", channel.id(), e);
+ } finally {
+ close(channel, CloseMode.GRACEFUL);
+ }
+ }
+
/**
* Begin closing this connection.
* If 'closeMode' is `CloseMode.GRACEFUL`, the channel is disconnected
here, but staged receives
@@ -735,10 +816,14 @@ public class Selector implements Selectable,
AutoCloseable {
// stagedReceives will be moved to completedReceives later along
with receives from other channels
closingChannels.put(channel.id(), channel);
log.debug("Tracking closing connection {} to process outstanding
requests", channel.id());
- } else
+ } else {
doClose(channel, closeMode.notifyDisconnect);
+ }
this.channels.remove(channel.id());
+ if (delayedClosingChannels != null)
+ delayedClosingChannels.remove(channel.id());
+
if (idleExpiryManager != null)
idleExpiryManager.remove(channel.id());
}
@@ -1064,6 +1149,46 @@ public class Selector implements Selectable,
AutoCloseable {
}
}
+ /**
+ * Encapsulate a channel that must be closed after a specific delay has
elapsed due to authentication failure.
+ */
+ private class DelayedAuthenticationFailureClose {
+ private final KafkaChannel channel;
+ private final long endTimeNanos;
+ private boolean closed;
+
+ /**
+ * @param channel The channel whose close is being delayed
+ * @param delayMs The amount of time by which the operation should be
delayed
+ */
+ public DelayedAuthenticationFailureClose(KafkaChannel channel, int
delayMs) {
+ this.channel = channel;
+ this.endTimeNanos = time.nanoseconds() + (delayMs * 1000L * 1000L);
+ this.closed = false;
+ }
+
+ /**
+ * Try to close this channel if the delay has expired.
+ * @param currentTimeNanos The current time
+ * @return True if the delay has expired and the channel was closed;
false otherwise
+ */
+ public final boolean tryClose(long currentTimeNanos) {
+ if (endTimeNanos <= currentTimeNanos)
+ closeNow();
+ return closed;
+ }
+
+ /**
+ * Close the channel now, regardless of whether the delay has expired
or not.
+ */
+ public final void closeNow() {
+ if (closed)
+ throw new IllegalStateException("Attempt to close a channel
that has already been closed");
+ handleCloseOnAuthenticationFailure(channel);
+ closed = true;
+ }
+ }
+
// helper class for tracking least recently used connections to enable
idle connection closing
private static class IdleExpiryManager {
private final Map<String, Long> lruConnections;
@@ -1114,4 +1239,9 @@ public class Selector implements Selectable,
AutoCloseable {
boolean isMadeReadProgressLastPoll() {
return madeReadProgressLastPoll;
}
+
+ // package-private for testing
+ Map<?, ?> delayedClosingChannels() {
+ return delayedClosingChannels;
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index e8f77a5..43cb0a4 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -118,6 +118,7 @@ public class SaslServerAuthenticator implements
Authenticator {
// buffers used in `authenticate`
private NetworkReceive netInBuffer;
private Send netOutBuffer;
+ private Send authenticationFailureSend = null;
// flag indicating if sasl tokens are sent as Kafka SaslAuthenticate
request/responses
private boolean enableKafkaSaslAuthenticateHeaders;
@@ -295,6 +296,11 @@ public class SaslServerAuthenticator implements
Authenticator {
}
@Override
+ public void handleAuthenticationFailure() throws IOException {
+ sendAuthenticationFailureResponse();
+ }
+
+ @Override
public void close() throws IOException {
if (principalBuilder instanceof Closeable)
Utils.closeQuietly((Closeable) principalBuilder, "principal
builder");
@@ -362,7 +368,7 @@ public class SaslServerAuthenticator implements
Authenticator {
RequestAndSize requestAndSize =
requestContext.parseRequest(requestBuffer);
if (apiKey != ApiKeys.SASL_AUTHENTICATE) {
IllegalSaslStateException e = new
IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + "
during SASL authentication.");
- sendKafkaResponse(requestContext,
requestAndSize.request.getErrorResponse(e));
+ buildResponseOnAuthenticateFailure(requestContext,
requestAndSize.request.getErrorResponse(e));
throw e;
}
if (!apiKey.isVersionSupported(version)) {
@@ -378,7 +384,8 @@ public class SaslServerAuthenticator implements
Authenticator {
ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER
: ByteBuffer.wrap(responseToken);
sendKafkaResponse(requestContext, new
SaslAuthenticateResponse(Errors.NONE, null, responseBuf));
} catch (SaslAuthenticationException e) {
- sendKafkaResponse(requestContext, new
SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()));
+ buildResponseOnAuthenticateFailure(requestContext,
+ new
SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()));
throw e;
} catch (SaslException e) {
KerberosError kerberosError = KerberosError.fromException(e);
@@ -464,7 +471,7 @@ public class SaslServerAuthenticator implements
Authenticator {
return clientMechanism;
} else {
LOG.debug("SASL mechanism '{}' requested by client is not
supported", clientMechanism);
- sendKafkaResponse(context, new
SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, enabledMechanisms));
+ buildResponseOnAuthenticateFailure(context, new
SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, enabledMechanisms));
throw new UnsupportedSaslMechanismException("Unsupported SASL
mechanism " + clientMechanism);
}
}
@@ -491,6 +498,24 @@ public class SaslServerAuthenticator implements
Authenticator {
}
}
+ /**
+ * Build a {@link Send} response on {@link #authenticate()} failure. The
actual response is sent out when
+ * {@link #sendAuthenticationFailureResponse()} is called.
+ */
+ private void buildResponseOnAuthenticateFailure(RequestContext context,
AbstractResponse response) {
+ authenticationFailureSend = context.buildResponse(response);
+ }
+
+ /**
+ * Send any authentication failure response that may have been previously
built.
+ */
+ private void sendAuthenticationFailureResponse() throws IOException {
+ if (authenticationFailureSend == null)
+ return;
+ sendKafkaResponse(authenticationFailureSend);
+ authenticationFailureSend = null;
+ }
+
private void sendKafkaResponse(RequestContext context, AbstractResponse
response) throws IOException {
sendKafkaResponse(context.buildResponse(response));
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
index 5998049..b08c8c1 100644
---
a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
+++
b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.network;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -28,6 +29,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
@@ -35,16 +37,22 @@ import org.apache.kafka.test.TestUtils;
* Common utility functions used by transport layer and authenticator tests.
*/
public class NetworkTestUtils {
+ public static NioEchoServer createEchoServer(ListenerName listenerName,
SecurityProtocol securityProtocol,
+ AbstractConfig serverConfig,
CredentialCache credentialCache, Time time) throws Exception {
+ return createEchoServer(listenerName, securityProtocol, serverConfig,
credentialCache, 100, time);
+ }
public static NioEchoServer createEchoServer(ListenerName listenerName,
SecurityProtocol securityProtocol,
- AbstractConfig serverConfig, CredentialCache credentialCache)
throws Exception {
- NioEchoServer server = new NioEchoServer(listenerName,
securityProtocol, serverConfig, "localhost", null, credentialCache);
+ AbstractConfig serverConfig,
CredentialCache credentialCache,
+ int
failedAuthenticationDelayMs, Time time) throws Exception {
+ NioEchoServer server = new NioEchoServer(listenerName,
securityProtocol, serverConfig, "localhost",
+ null, credentialCache, failedAuthenticationDelayMs, time);
server.start();
return server;
}
- public static Selector createSelector(ChannelBuilder channelBuilder) {
- return new Selector(5000, new Metrics(), new MockTime(),
"MetricGroup", channelBuilder, new LogContext());
+ public static Selector createSelector(ChannelBuilder channelBuilder, Time
time) {
+ return new Selector(5000, new Metrics(), time, "MetricGroup",
channelBuilder, new LogContext());
}
public static void checkClientConnection(Selector selector, String node,
int minMessageSize, int messageCount) throws Exception {
@@ -79,19 +87,33 @@ public class NetworkTestUtils {
assertTrue(selector.isChannelReady(node));
}
- public static ChannelState waitForChannelClose(Selector selector, String
node, ChannelState.State channelState)
+ public static ChannelState waitForChannelClose(Selector selector, String
node, ChannelState.State channelState, MockTime mockTime)
throws IOException {
boolean closed = false;
- for (int i = 0; i < 30; i++) {
- selector.poll(1000L);
+ for (int i = 0; i < 300; i++) {
+ selector.poll(100L);
if (selector.channel(node) == null &&
selector.closingChannel(node) == null) {
closed = true;
break;
}
+ if (mockTime != null)
+ mockTime.setCurrentTimeMs(mockTime.milliseconds() + 150);
}
assertTrue("Channel was not closed by timeout", closed);
ChannelState finalState = selector.disconnected().get(node);
assertEquals(channelState, finalState.state());
return finalState;
}
+
+ public static ChannelState waitForChannelClose(Selector selector, String
node, ChannelState.State channelState) throws IOException {
+ return waitForChannelClose(selector, node, channelState, null);
+ }
+
+ public static void completeDelayedChannelClose(Selector selector, long
currentTimeNanos) {
+ selector.completeDelayedChannelClose(currentTimeNanos);
+ }
+
+ public static Map<?, ?> delayedClosingChannels(Selector selector) {
+ return selector.delayedClosingChannels();
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 64b7e4e..bd212fd 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -27,7 +27,7 @@ import
org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@@ -69,7 +69,13 @@ public class NioEchoServer extends Thread {
private final DelegationTokenCache tokenCache;
public NioEchoServer(ListenerName listenerName, SecurityProtocol
securityProtocol, AbstractConfig config,
- String serverHost, ChannelBuilder channelBuilder, CredentialCache
credentialCache) throws Exception {
+ String serverHost, ChannelBuilder channelBuilder,
CredentialCache credentialCache, Time time) throws Exception {
+ this(listenerName, securityProtocol, config, serverHost,
channelBuilder, credentialCache, 100, time);
+ }
+
+ public NioEchoServer(ListenerName listenerName, SecurityProtocol
securityProtocol, AbstractConfig config,
+ String serverHost, ChannelBuilder channelBuilder,
CredentialCache credentialCache,
+ int failedAuthenticationDelayMs, Time time) throws
Exception {
super("echoserver");
setDaemon(true);
serverSocketChannel = ServerSocketChannel.open();
@@ -89,7 +95,7 @@ public class NioEchoServer extends Thread {
if (channelBuilder == null)
channelBuilder =
ChannelBuilders.serverChannelBuilder(listenerName, false, securityProtocol,
config, credentialCache, tokenCache);
this.metrics = new Metrics();
- this.selector = new Selector(5000, metrics, new MockTime(),
"MetricGroup", channelBuilder, new LogContext());
+ this.selector = new Selector(5000, failedAuthenticationDelayMs,
metrics, time, "MetricGroup", channelBuilder, new LogContext());
acceptorThread = new AcceptorThread();
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 919c167..6783438 100644
---
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestCondition;
@@ -64,6 +63,7 @@ import static org.junit.Assert.fail;
public class SslTransportLayerTest {
private static final int BUFFER_SIZE = 4 * 1024;
+ private static Time time = Time.SYSTEM;
private NioEchoServer server;
private Selector selector;
@@ -82,7 +82,7 @@ public class SslTransportLayerTest {
sslClientConfigs =
clientCertStores.getTrustingConfig(serverCertStores);
this.channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
this.channelBuilder.configure(sslClientConfigs);
- this.selector = new Selector(5000, new Metrics(), new MockTime(),
"MetricGroup", channelBuilder, new LogContext());
+ this.selector = new Selector(5000, new Metrics(), time, "MetricGroup",
channelBuilder, new LogContext());
}
@After
@@ -204,7 +204,7 @@ public class SslTransportLayerTest {
};
serverChannelBuilder.configure(sslServerConfigs);
server = new
NioEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL),
SecurityProtocol.SSL,
- new TestSecurityConfig(sslServerConfigs), "localhost",
serverChannelBuilder, null);
+ new TestSecurityConfig(sslServerConfigs), "localhost",
serverChannelBuilder, null, time);
server.start();
createSelector(sslClientConfigs);
@@ -786,7 +786,7 @@ public class SslTransportLayerTest {
channelBuilder.flushFailureAction = flushFailureAction;
channelBuilder.failureIndex = i;
channelBuilder.configure(sslClientConfigs);
- this.selector = new Selector(5000, new Metrics(), new MockTime(),
"MetricGroup", channelBuilder, new LogContext());
+ this.selector = new Selector(5000, new Metrics(), time,
"MetricGroup", channelBuilder, new LogContext());
InetSocketAddress addr = new InetSocketAddress("localhost",
server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@@ -829,7 +829,7 @@ public class SslTransportLayerTest {
serverChannelBuilder.flushDelayCount = i;
server = new
NioEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL),
SecurityProtocol.SSL, new
TestSecurityConfig(sslServerConfigs),
- "localhost", serverChannelBuilder, null);
+ "localhost", serverChannelBuilder, null, time);
server.start();
createSelector(sslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("localhost",
server.port());
@@ -855,7 +855,7 @@ public class SslTransportLayerTest {
String node = "0";
server = createEchoServer(securityProtocol);
clientChannelBuilder.configure(sslClientConfigs);
- this.selector = new Selector(5000, new Metrics(), new MockTime(),
"MetricGroup", clientChannelBuilder, new LogContext());
+ this.selector = new Selector(5000, new Metrics(), time, "MetricGroup",
clientChannelBuilder, new LogContext());
InetSocketAddress addr = new InetSocketAddress("localhost",
server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@@ -895,7 +895,7 @@ public class SslTransportLayerTest {
ChannelBuilder serverChannelBuilder =
ChannelBuilders.serverChannelBuilder(listenerName,
false, securityProtocol, config, null, null);
server = new NioEchoServer(listenerName, securityProtocol, config,
- "localhost", serverChannelBuilder, null);
+ "localhost", serverChannelBuilder, null, time);
server.start();
InetSocketAddress addr = new InetSocketAddress("localhost",
server.port());
@@ -955,7 +955,7 @@ public class SslTransportLayerTest {
ChannelBuilder serverChannelBuilder =
ChannelBuilders.serverChannelBuilder(listenerName,
false, securityProtocol, config, null, null);
server = new NioEchoServer(listenerName, securityProtocol, config,
- "localhost", serverChannelBuilder, null);
+ "localhost", serverChannelBuilder, null, time);
server.start();
InetSocketAddress addr = new InetSocketAddress("localhost",
server.port());
@@ -1027,12 +1027,12 @@ public class SslTransportLayerTest {
channelBuilder.configureBufferSizes(netReadBufSize, netWriteBufSize,
appBufSize);
this.channelBuilder = channelBuilder;
this.channelBuilder.configure(sslClientConfigs);
- this.selector = new Selector(5000, new Metrics(), new MockTime(),
"MetricGroup", channelBuilder, new LogContext());
+ this.selector = new Selector(5000, new Metrics(), time, "MetricGroup",
channelBuilder, new LogContext());
return selector;
}
private NioEchoServer createEchoServer(ListenerName listenerName,
SecurityProtocol securityProtocol) throws Exception {
- return NetworkTestUtils.createEchoServer(listenerName,
securityProtocol, new TestSecurityConfig(sslServerConfigs), null);
+ return NetworkTestUtils.createEchoServer(listenerName,
securityProtocol, new TestSecurityConfig(sslServerConfigs), null, time);
}
private NioEchoServer createEchoServer(SecurityProtocol securityProtocol)
throws Exception {
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index 413997f..2fce4c5 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -48,6 +49,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ClientAuthenticationFailureTest {
+ private static MockTime time = new MockTime(50);
private NioEchoServer server;
private Map<String, Object> saslServerConfigs;
@@ -147,6 +149,6 @@ public class ClientAuthenticationFailureTest {
private NioEchoServer createEchoServer(ListenerName listenerName,
SecurityProtocol securityProtocol) throws Exception {
return NetworkTestUtils.createEchoServer(listenerName,
securityProtocol,
- new TestSecurityConfig(saslServerConfigs), new
CredentialCache());
+ new TestSecurityConfig(saslServerConfigs), new
CredentialCache(), time);
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
new file mode 100644
index 0000000..b0dfc7a
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
+import org.apache.kafka.common.network.CertStores;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.ChannelBuilders;
+import org.apache.kafka.common.network.ChannelState;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.network.NetworkTestUtils;
+import org.apache.kafka.common.network.NioEchoServer;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.TestSecurityConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(value = Parameterized.class)
+public class SaslAuthenticatorFailureDelayTest {
+ private static final int BUFFER_SIZE = 4 * 1024;
+ private static MockTime time = new MockTime(50);
+
+ private NioEchoServer server;
+ private Selector selector;
+ private ChannelBuilder channelBuilder;
+ private CertStores serverCertStores;
+ private CertStores clientCertStores;
+ private Map<String, Object> saslClientConfigs;
+ private Map<String, Object> saslServerConfigs;
+ private CredentialCache credentialCache;
+ private long startTimeMs;
+ private final int failedAuthenticationDelayMs;
+
+ public SaslAuthenticatorFailureDelayTest(int failedAuthenticationDelayMs) {
+ this.failedAuthenticationDelayMs = failedAuthenticationDelayMs;
+ }
+
+ @Parameterized.Parameters(name = "failedAuthenticationDelayMs={0}")
+ public static Collection<Object[]> data() {
+ List<Object[]> values = new ArrayList<>();
+ values.add(new Object[]{0});
+ values.add(new Object[]{200});
+ return values;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ LoginManager.closeAll();
+ serverCertStores = new CertStores(true, "localhost");
+ clientCertStores = new CertStores(false, "localhost");
+ saslServerConfigs =
serverCertStores.getTrustingConfig(clientCertStores);
+ saslClientConfigs =
clientCertStores.getTrustingConfig(serverCertStores);
+ credentialCache = new CredentialCache();
+ SaslAuthenticatorTest.TestLogin.loginCount.set(0);
+ startTimeMs = time.milliseconds();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ long now = time.milliseconds();
+ if (server != null)
+ this.server.close();
+ if (selector != null)
+ this.selector.close();
+ if (failedAuthenticationDelayMs != -1)
+ assertTrue("timeSpent: " + (now - startTimeMs), now - startTimeMs
>= failedAuthenticationDelayMs);
+ }
+
+ /**
+ * Tests that SASL/PLAIN clients with invalid password fail authentication.
+ */
+ @Test
+ public void testInvalidPasswordSaslPlain() throws Exception {
+ String node = "0";
+ SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+ TestJaasConfig jaasConfig = configureMechanisms("PLAIN",
Arrays.asList("PLAIN"));
+ jaasConfig.setClientOptions("PLAIN", TestJaasConfig.USERNAME,
"invalidpassword");
+
+ server = createEchoServer(securityProtocol);
+ createAndCheckClientAuthenticationFailure(securityProtocol, node,
"PLAIN",
+ "Authentication failed: Invalid username or password");
+ server.verifyAuthenticationMetrics(0, 1);
+ }
+
+ /**
+ * Tests client connection close before response for authentication
failure is sent.
+ */
+ @Test
+ public void testClientConnectionClose() throws Exception {
+ String node = "0";
+ SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+ TestJaasConfig jaasConfig = configureMechanisms("PLAIN",
Arrays.asList("PLAIN"));
+ jaasConfig.setClientOptions("PLAIN", TestJaasConfig.USERNAME,
"invalidpassword");
+
+ server = createEchoServer(securityProtocol);
+ createClientConnection(securityProtocol, node);
+
+ Map<?, ?> delayedClosingChannels =
NetworkTestUtils.delayedClosingChannels(server.selector());
+
+ // Wait until server has established connection with client and has
processed the auth failure
+ TestUtils.waitForCondition(() -> {
+ poll(selector);
+ return !server.selector().channels().isEmpty();
+ }, "Timeout waiting for connection");
+ TestUtils.waitForCondition(() -> {
+ poll(selector);
+ return failedAuthenticationDelayMs == 0 ||
!delayedClosingChannels.isEmpty();
+ }, "Timeout waiting for auth failure");
+
+ selector.close();
+ selector = null;
+
+ // Now that client connection is closed, wait until server notices the
disconnection and removes it from the
+ // list of connected channels and from delayed response for auth
failure
+ TestUtils.waitForCondition(() -> failedAuthenticationDelayMs == 0 ||
delayedClosingChannels.isEmpty(),
+ "Timeout waiting for delayed response remove");
+ TestUtils.waitForCondition(() ->
server.selector().channels().isEmpty(),
+ "Timeout waiting for connection close");
+
+ // Try forcing completion of delayed channel close
+ TestUtils.waitForCondition(() -> time.milliseconds() > startTimeMs +
failedAuthenticationDelayMs + 1,
+ "Timeout when waiting for auth failure response timeout to
elapse");
+ NetworkTestUtils.completeDelayedChannelClose(server.selector(),
time.nanoseconds());
+ }
+
+ private void poll(Selector selector) {
+ try {
+ selector.poll(50);
+ } catch (IOException e) {
+ Assert.fail("Caught unexpected exception " + e);
+ }
+ }
+
+ private TestJaasConfig configureMechanisms(String clientMechanism,
List<String> serverMechanisms) {
+ saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, clientMechanism);
+
saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
serverMechanisms);
+ if (serverMechanisms.contains("DIGEST-MD5")) {
+ saslServerConfigs.put("digest-md5." +
BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS,
+
TestDigestLoginModule.DigestServerCallbackHandler.class.getName());
+ }
+ return TestJaasConfig.createConfiguration(clientMechanism,
serverMechanisms);
+ }
+
+ private void createSelector(SecurityProtocol securityProtocol, Map<String,
Object> clientConfigs) {
+ if (selector != null) {
+ selector.close();
+ selector = null;
+ }
+
+ String saslMechanism = (String)
saslClientConfigs.get(SaslConfigs.SASL_MECHANISM);
+ this.channelBuilder =
ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT,
+ new TestSecurityConfig(clientConfigs), null, saslMechanism,
true);
+ this.selector = NetworkTestUtils.createSelector(channelBuilder, time);
+ }
+
+ private NioEchoServer createEchoServer(SecurityProtocol securityProtocol)
throws Exception {
+ return
createEchoServer(ListenerName.forSecurityProtocol(securityProtocol),
securityProtocol);
+ }
+
+ private NioEchoServer createEchoServer(ListenerName listenerName,
SecurityProtocol securityProtocol) throws Exception {
+ if (failedAuthenticationDelayMs != -1)
+ return NetworkTestUtils.createEchoServer(listenerName,
securityProtocol,
+ new TestSecurityConfig(saslServerConfigs),
credentialCache, failedAuthenticationDelayMs, time);
+ else
+ return NetworkTestUtils.createEchoServer(listenerName,
securityProtocol,
+ new TestSecurityConfig(saslServerConfigs),
credentialCache, time);
+ }
+
+ private void createClientConnection(SecurityProtocol securityProtocol,
String node) throws Exception {
+ createSelector(securityProtocol, saslClientConfigs);
+ InetSocketAddress addr = new InetSocketAddress("127.0.0.1",
server.port());
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+ }
+
+ private void createAndCheckClientAuthenticationFailure(SecurityProtocol
securityProtocol, String node,
+ String mechanism,
String expectedErrorMessage) throws Exception {
+ ChannelState finalState =
createAndCheckClientConnectionFailure(securityProtocol, node);
+ Exception exception = finalState.exception();
+ assertTrue("Invalid exception class " + exception.getClass(),
exception instanceof SaslAuthenticationException);
+ if (expectedErrorMessage == null)
+ expectedErrorMessage = "Authentication failed due to invalid
credentials with SASL mechanism " + mechanism;
+ assertEquals(expectedErrorMessage, exception.getMessage());
+ }
+
+ private ChannelState
createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String
node)
+ throws Exception {
+ createClientConnection(securityProtocol, node);
+ ChannelState finalState =
NetworkTestUtils.waitForChannelClose(selector, node,
+ ChannelState.State.AUTHENTICATION_FAILED, time);
+ selector.close();
+ selector = null;
+ return finalState;
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index b8894f1..74058eb 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -92,6 +92,7 @@ import
org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import
org.apache.kafka.common.security.authenticator.TestDigestLoginModule.DigestServerCallbackHandler;
import
org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler;
+import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -107,6 +108,7 @@ import static org.junit.Assert.fail;
public class SaslAuthenticatorTest {
private static final int BUFFER_SIZE = 4 * 1024;
+ private static Time time = Time.SYSTEM;
private NioEchoServer server;
private Selector selector;
@@ -1308,7 +1310,7 @@ public class SaslAuthenticatorTest {
};
serverChannelBuilder.configure(saslServerConfigs);
server = new NioEchoServer(listenerName, securityProtocol, new
TestSecurityConfig(saslServerConfigs),
- "localhost", serverChannelBuilder, credentialCache);
+ "localhost", serverChannelBuilder, credentialCache, time);
server.start();
return server;
}
@@ -1347,7 +1349,7 @@ public class SaslAuthenticatorTest {
}
};
clientChannelBuilder.configure(saslClientConfigs);
- this.selector = NetworkTestUtils.createSelector(clientChannelBuilder);
+ this.selector = NetworkTestUtils.createSelector(clientChannelBuilder,
time);
InetSocketAddress addr = new InetSocketAddress("127.0.0.1",
server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
}
@@ -1452,7 +1454,7 @@ public class SaslAuthenticatorTest {
String saslMechanism = (String)
saslClientConfigs.get(SaslConfigs.SASL_MECHANISM);
this.channelBuilder =
ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT,
new TestSecurityConfig(clientConfigs), null, saslMechanism,
true);
- this.selector = NetworkTestUtils.createSelector(channelBuilder);
+ this.selector = NetworkTestUtils.createSelector(channelBuilder, time);
}
private NioEchoServer createEchoServer(SecurityProtocol securityProtocol)
throws Exception {
@@ -1461,7 +1463,7 @@ public class SaslAuthenticatorTest {
private NioEchoServer createEchoServer(ListenerName listenerName,
SecurityProtocol securityProtocol) throws Exception {
return NetworkTestUtils.createEchoServer(listenerName,
securityProtocol,
- new TestSecurityConfig(saslServerConfigs), credentialCache);
+ new TestSecurityConfig(saslServerConfigs), credentialCache,
time);
}
private void createClientConnection(SecurityProtocol securityProtocol,
String node) throws Exception {
@@ -1490,8 +1492,7 @@ public class SaslAuthenticatorTest {
private ChannelState
createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String
node)
throws Exception {
createClientConnection(securityProtocol, node);
- ChannelState finalState =
NetworkTestUtils.waitForChannelClose(selector, node,
- ChannelState.State.AUTHENTICATION_FAILED);
+ ChannelState finalState =
NetworkTestUtils.waitForChannelClose(selector, node,
ChannelState.State.AUTHENTICATION_FAILED);
selector.close();
selector = null;
return finalState;
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index 96feee8..1365f90 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -245,6 +245,7 @@ class SocketServer(val config: KafkaConfig, val metrics:
Metrics, val time: Time
requestChannel,
connectionQuotas,
config.connectionsMaxIdleMs,
+ config.failedAuthenticationDelayMs,
listenerName,
securityProtocol,
config,
@@ -502,6 +503,7 @@ private[kafka] class Processor(val id: Int,
requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas,
connectionsMaxIdleMs: Long,
+ failedAuthenticationDelayMs: Int,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
config: KafkaConfig,
@@ -563,6 +565,7 @@ private[kafka] class Processor(val id: Int,
new KSelector(
maxRequestSize,
connectionsMaxIdleMs,
+ failedAuthenticationDelayMs,
metrics,
time,
"socket-server",
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 334d496..753a5b9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -77,6 +77,7 @@ object Defaults {
val MaxConnectionsPerIpOverrides: String = ""
val ConnectionsMaxIdleMs = 10 * 60 * 1000L
val RequestTimeoutMs = 30000
+ val FailedAuthenticationDelayMs = 100
/** ********* Log Configuration ***********/
val NumPartitions = 1
@@ -282,6 +283,7 @@ object KafkaConfig {
val MaxConnectionsPerIpProp = "max.connections.per.ip"
val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides"
val ConnectionsMaxIdleMsProp = "connections.max.idle.ms"
+ val FailedAuthenticationDelayMsProp =
"connection.failed.authentication.delay.ms"
/***************** rack configuration *************/
val RackProp = "broker.rack"
/** ********* Log Configuration ***********/
@@ -537,6 +539,8 @@ object KafkaConfig {
"configured using " + MaxConnectionsPerIpOverridesProp + " property"
val MaxConnectionsPerIpOverridesDoc = "A comma-separated list of per-ip or
hostname overrides to the default maximum number of connections. An example
value is \"hostName:100,127.0.0.1:200\""
val ConnectionsMaxIdleMsDoc = "Idle connections timeout: the server socket
processor threads close the connections that idle more than this"
+ val FailedAuthenticationDelayMsDoc = "Connection close delay on failed
authentication: this is the time (in milliseconds) by which connection close
will be delayed on authentication failure. " +
+ s"This must be configured to be less than $ConnectionsMaxIdleMsProp to
prevent connection timeout."
/************* Rack Configuration **************/
val RackDoc = "Rack of the broker. This will be used in rack aware
replication assignment for fault tolerance. Examples: `RACK1`, `us-east-1d`"
/** ********* Log Configuration ***********/
@@ -820,6 +824,7 @@ object KafkaConfig {
.define(MaxConnectionsPerIpProp, INT, Defaults.MaxConnectionsPerIp,
atLeast(0), MEDIUM, MaxConnectionsPerIpDoc)
.define(MaxConnectionsPerIpOverridesProp, STRING,
Defaults.MaxConnectionsPerIpOverrides, MEDIUM, MaxConnectionsPerIpOverridesDoc)
.define(ConnectionsMaxIdleMsProp, LONG, Defaults.ConnectionsMaxIdleMs,
MEDIUM, ConnectionsMaxIdleMsDoc)
+ .define(FailedAuthenticationDelayMsProp, INT,
Defaults.FailedAuthenticationDelayMs, atLeast(0), LOW,
FailedAuthenticationDelayMsDoc)
/************ Rack Configuration ******************/
.define(RackProp, STRING, null, MEDIUM, RackDoc)
@@ -1101,6 +1106,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
val maxConnectionsPerIpOverrides: Map[String, Int] =
getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp,
getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) =>
(k, v.toInt)}
val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp)
+ val failedAuthenticationDelayMs =
getInt(KafkaConfig.FailedAuthenticationDelayMsProp)
/***************** rack configuration **************/
val rack = Option(getString(KafkaConfig.RackProp))
@@ -1397,5 +1403,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
val invalidAddresses = maxConnectionsPerIpOverrides.keys.filterNot(address
=> Utils.validHostPattern(address))
if (!invalidAddresses.isEmpty)
throw new
IllegalArgumentException(s"${KafkaConfig.MaxConnectionsPerIpOverridesProp}
contains invalid addresses : ${invalidAddresses.mkString(",")}")
+
+ if (connectionsMaxIdleMs >= 0)
+ require(failedAuthenticationDelayMs < connectionsMaxIdleMs,
+
s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs
should always be less than" +
+ s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to
prevent failed" +
+ " authentication responses from timing out")
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
index a9b4a60..a630293 100644
---
a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
@@ -24,10 +24,10 @@ import scala.collection.immutable.List
class SaslGssapiSslEndToEndAuthorizationTest extends
SaslEndToEndAuthorizationTest {
override val clientPrincipal =
JaasTestUtils.KafkaClientPrincipalUnqualifiedName
override val kafkaPrincipal =
JaasTestUtils.KafkaServerPrincipalUnqualifiedName
-
+
override protected def kafkaClientSaslMechanism = "GSSAPI"
override protected def kafkaServerSaslMechanisms = List("GSSAPI")
-
+
// Configure brokers to require SSL client authentication in order to verify
that SASL_SSL works correctly even if the
// client doesn't have a keystore. We want to cover the scenario where a
broker requires either SSL client
// authentication or SASL authentication with SSL as the transport layer
(but not both).
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 3913212..81de105 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -138,8 +138,12 @@ trait SaslSetup {
props
}
- def jaasClientLoginModule(clientSaslMechanism: String): String =
- JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile)
+ def jaasClientLoginModule(clientSaslMechanism: String, serviceName:
Option[String] = None): String = {
+ if (serviceName.isDefined)
+ JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile,
serviceName.get)
+ else
+ JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile)
+ }
def createScramCredentials(zkConnect: String, userName: String, password:
String): Unit = {
val credentials = ScramMechanism.values.map(m =>
s"${m.mechanismName}=[iterations=4096,password=$password]")
diff --git
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 74b2a15..cbe8462 100644
---
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -19,19 +19,25 @@
package kafka.server
import java.net.InetSocketAddress
+import java.time.Duration
import java.util.Properties
import java.util.concurrent.{Executors, TimeUnit}
import kafka.api.{Both, IntegrationTestHarness, SaslSetup}
import kafka.utils.TestUtils
import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.errors.SaslAuthenticationException
import org.apache.kafka.common.network._
import org.apache.kafka.common.security.{JaasContext, TestSecurityConfig}
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.MockTime
import org.junit.Assert._
import org.junit.{After, Before, Test}
+import scala.collection.JavaConverters._
+
class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
override val serverCount = 1
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@@ -43,10 +49,17 @@ class GssapiAuthenticationTest extends
IntegrationTestHarness with SaslSetup {
private val executor = Executors.newFixedThreadPool(numThreads)
private val clientConfig: Properties = new Properties
private var serverAddr: InetSocketAddress = _
+ private val time = new MockTime(10)
+ val topic = "topic"
+ val part = 0
+ val tp = new TopicPartition(topic, part)
+ private val failedAuthenticationDelayMs = 2000
@Before
override def setUp() {
startSasl(jaasSections(kafkaServerSaslMechanisms,
Option(kafkaClientSaslMechanism), Both))
+ serverConfig.put(KafkaConfig.SslClientAuthProp, "required")
+ serverConfig.put(KafkaConfig.FailedAuthenticationDelayMsProp,
failedAuthenticationDelayMs.toString)
super.setUp()
serverAddr = new InetSocketAddress("localhost",
servers.head.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)))
@@ -55,6 +68,9 @@ class GssapiAuthenticationTest extends IntegrationTestHarness
with SaslSetup {
clientConfig.put(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism)
clientConfig.put(SaslConfigs.SASL_JAAS_CONFIG,
jaasClientLoginModule(kafkaClientSaslMechanism))
clientConfig.put(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG,
"5000")
+
+ // create the test topic with all the brokers as replicas
+ createTopic(topic, 2, serverCount)
}
@After
@@ -94,6 +110,31 @@ class GssapiAuthenticationTest extends
IntegrationTestHarness with SaslSetup {
}
/**
+ * Test that when client fails to verify authenticity of the server, the
resulting failed authentication exception
+ * is thrown immediately, and is not affected by
<code>connection.failed.authentication.delay.ms</code>.
+ */
+ @Test
+ def testServerAuthenticationFailure(): Unit = {
+ // Setup client with a non-existent service principal, so that server
authentication fails on the client
+ val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism,
Some("another-kafka-service"))
+ val configOverrides = new Properties()
+ configOverrides.setProperty(SaslConfigs.SASL_JAAS_CONFIG,
clientLoginContext)
+ val consumer = createConsumer(configOverrides = configOverrides)
+ consumer.assign(List(tp).asJava)
+
+ val startMs = System.currentTimeMillis()
+ try {
+ consumer.poll(Duration.ofMillis(50))
+ fail()
+ } catch {
+ case _: SaslAuthenticationException =>
+ }
+ val endMs = System.currentTimeMillis()
+ require(endMs - startMs < failedAuthenticationDelayMs, "Failed
authentication must not be delayed on the client")
+ consumer.close()
+ }
+
+ /**
* Verifies that any exceptions during authentication with the current
`clientConfig` are
* notified with disconnect state `AUTHENTICATE` (and not
`AUTHENTICATION_FAILED`). This
* is to ensure that NetworkClient doesn't handle this as a fatal
authentication failure,
@@ -148,6 +189,6 @@ class GssapiAuthenticationTest extends
IntegrationTestHarness with SaslSetup {
private def createSelector(): Selector = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(securityProtocol,
JaasContext.Type.CLIENT, new TestSecurityConfig(clientConfig), null,
kafkaClientSaslMechanism, true)
- NetworkTestUtils.createSelector(channelBuilder)
+ NetworkTestUtils.createSelector(channelBuilder, time)
}
}
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index da61149..b598337 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -307,13 +307,14 @@ class SocketServerTest extends JUnitSuite {
override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas,
listenerName: ListenerName,
protocol: SecurityProtocol, memoryPool:
MemoryPool): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, requestChannel,
connectionQuotas,
- config.connectionsMaxIdleMs, listenerName, protocol, config,
metrics, credentialProvider, memoryPool, new LogContext()) {
- override protected[network] def connectionId(socket: Socket): String
= overrideConnectionId
- override protected[network] def createSelector(channelBuilder:
ChannelBuilder): Selector = {
- val testableSelector = new TestableSelector(config, channelBuilder,
time, metrics)
- selector = testableSelector
- testableSelector
- }
+ config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs,
listenerName, protocol, config, metrics,
+ credentialProvider, memoryPool, new LogContext()) {
+ override protected[network] def connectionId(socket: Socket):
String = overrideConnectionId
+ override protected[network] def createSelector(channelBuilder:
ChannelBuilder): Selector = {
+ val testableSelector = new TestableSelector(config,
channelBuilder, time, metrics)
+ selector = testableSelector
+ testableSelector
+ }
}
}
}
@@ -652,7 +653,8 @@ class SocketServerTest extends JUnitSuite {
override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas,
listenerName: ListenerName,
protocol: SecurityProtocol, memoryPool:
MemoryPool): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, requestChannel,
connectionQuotas,
- config.connectionsMaxIdleMs, listenerName, protocol, config,
metrics, credentialProvider, MemoryPool.NONE, new LogContext()) {
+ config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs,
listenerName, protocol, config, metrics,
+ credentialProvider, MemoryPool.NONE, new LogContext()) {
override protected[network] def sendResponse(response:
RequestChannel.Response, responseSend: Send) {
conn.close()
super.sendResponse(response, responseSend)
@@ -697,7 +699,8 @@ class SocketServerTest extends JUnitSuite {
override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas,
listenerName: ListenerName,
protocol: SecurityProtocol, memoryPool:
MemoryPool): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, requestChannel,
connectionQuotas,
- config.connectionsMaxIdleMs, listenerName, protocol, config,
metrics, credentialProvider, memoryPool, new LogContext()) {
+ config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs,
listenerName, protocol, config, metrics,
+ credentialProvider, memoryPool, new LogContext()) {
override protected[network] def connectionId(socket: Socket): String
= overrideConnectionId
override protected[network] def createSelector(channelBuilder:
ChannelBuilder): Selector = {
val testableSelector = new TestableSelector(config, channelBuilder,
time, metrics)
@@ -743,7 +746,7 @@ class SocketServerTest extends JUnitSuite {
@Test
def testBrokerSendAfterChannelClosedUpdatesRequestMetrics() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 0)
- props.setProperty(KafkaConfig.ConnectionsMaxIdleMsProp, "100")
+ props.setProperty(KafkaConfig.ConnectionsMaxIdleMsProp, "110")
val serverMetrics = new Metrics
var conn: Socket = null
val overrideServer = new SocketServer(KafkaConfig.fromProps(props),
serverMetrics, Time.SYSTEM, credentialProvider)
@@ -1100,10 +1103,8 @@ class SocketServerTest extends JUnitSuite {
override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas,
listenerName: ListenerName,
protocol: SecurityProtocol, memoryPool:
MemoryPool): Processor = {
- new Processor(id, time, config.socketRequestMaxBytes, requestChannel,
connectionQuotas,
- config.connectionsMaxIdleMs, listenerName, protocol, config, metrics,
credentialProvider, memoryPool, new
- LogContext()) {
-
+ new Processor(id, time, config.socketRequestMaxBytes, requestChannel,
connectionQuotas, config.connectionsMaxIdleMs,
+ config.failedAuthenticationDelayMs, listenerName, protocol, config,
metrics, credentialProvider, memoryPool, new LogContext()) {
override protected[network] def createSelector(channelBuilder:
ChannelBuilder): Selector = {
val testableSelector = new TestableSelector(config, channelBuilder,
time, metrics)
assertEquals(None, selector)
@@ -1149,7 +1150,7 @@ class SocketServerTest extends JUnitSuite {
}
class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder,
time: Time, metrics: Metrics)
- extends Selector(config.socketRequestMaxBytes,
config.connectionsMaxIdleMs,
+ extends Selector(config.socketRequestMaxBytes,
config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs,
metrics, time, "socket-server", new HashMap, false, true,
channelBuilder, MemoryPool.NONE, new LogContext()) {
val failures = mutable.Map[SelectorOperation, Exception]()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 927dd1c..b7a8951 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -587,6 +587,7 @@ class KafkaConfigTest {
case KafkaConfig.MaxConnectionsPerIpOverridesProp =>
assertPropertyInvalid(getBaseProperties(), name,
"127.0.0.1:not_a_number")
case KafkaConfig.ConnectionsMaxIdleMsProp =>
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.FailedAuthenticationDelayMsProp =>
assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
case KafkaConfig.NumPartitionsProp =>
assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogDirsProp => // ignore string
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index e8d9c30..1870a49 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -170,8 +170,8 @@ object JaasTestUtils {
}
// Returns the dynamic configuration, using credentials for user #1
- def clientLoginModule(mechanism: String, keytabLocation: Option[File]):
String =
- kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal,
KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword,
KafkaOAuthBearerUser).toString
+ def clientLoginModule(mechanism: String, keytabLocation: Option[File],
serviceName: String = serviceName): String =
+ kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal,
KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword,
KafkaOAuthBearerUser, serviceName).toString
def tokenClientLoginModule(tokenId: String, password: String): String = {
ScramLoginModule(
@@ -223,10 +223,11 @@ object JaasTestUtils {
}
// consider refactoring if more mechanisms are added
- private def kafkaClientModule(mechanism: String,
+ private def kafkaClientModule(mechanism: String,
keytabLocation: Option[File], clientPrincipal: String,
plainUser: String, plainPassword: String,
- scramUser: String, scramPassword: String, oauthBearerUser: String):
JaasModule = {
+ scramUser: String, scramPassword: String,
+ oauthBearerUser: String, serviceName: String = serviceName): JaasModule
= {
mechanism match {
case "GSSAPI" =>
Krb5LoginModule(