This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f73e3bcd6a5 KAFKA-19561: Set OP_WRITE interest after SASL reauthentication to resume pending writes (#20258) f73e3bcd6a5 is described below commit f73e3bcd6a5b57dc6bcfdfbcbe1b2a5b015e62b2 Author: Manikumar Reddy <manikumar.re...@gmail.com> AuthorDate: Thu Jul 31 21:59:21 2025 +0530 KAFKA-19561: Set OP_WRITE interest after SASL reauthentication to resume pending writes (#20258) https://issues.apache.org/jira/browse/KAFKA-19561 Addresses a race condition during SASL reauthentication where the server-side `KafkaChannel.send()` queues a response, but OP_WRITE is removed before the channel becomes writable — resulting in stuck responses and client timeouts. Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com> --- .../apache/kafka/common/network/KafkaChannel.java | 10 +++ .../org/apache/kafka/common/network/Selector.java | 1 + .../authenticator/SaslAuthenticatorTest.java | 75 +++++++++++++++++++++- 3 files changed, 85 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index df6ccd67ce5..22c24f8408c 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -681,4 +681,14 @@ public class KafkaChannel implements AutoCloseable { public ChannelMetadataRegistry channelMetadataRegistry() { return metadataRegistry; } + + + /** + * Maybe add write interest after re-authentication. This is to ensure that any pending write operation + * is resumed. + */ + public void maybeAddWriteInterestAfterReauth() { + if (send != null) + this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 151a0fbbd88..7acf88269ee 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -551,6 +551,7 @@ public class Selector implements Selectable, AutoCloseable { boolean isReauthentication = channel.successfulAuthentications() > 1; if (isReauthentication) { sensors.successfulReauthentication.record(1.0, readyTimeMs); + channel.maybeAddWriteInterestAfterReauth(); if (channel.reauthenticationLatencyMs() == null) log.warn( "Should never happen: re-authentication latency for a re-authenticated channel was null; continuing..."); diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index a15414a73d0..0a63c05eb35 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -44,6 +44,7 @@ import org.apache.kafka.common.network.ChannelMetadataRegistry; import org.apache.kafka.common.network.ChannelState; import org.apache.kafka.common.network.ConnectionMode; import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.NetworkTestUtils; import org.apache.kafka.common.network.NioEchoServer; @@ -120,6 +121,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -1856,6 +1858,69 @@ public class SaslAuthenticatorTest { verifySslClientAuthForSaslSslListener(false, SslClientAuth.REQUIRED); } + @Test + public void testServerSidePendingSendDuringReauthentication() throws Exception { + SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; + TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Collections.singletonList("PLAIN")); + jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(), new HashMap<>()); + jaasConfig.setClientOptions("PLAIN", TestServerCallbackHandler.USERNAME, TestServerCallbackHandler.PASSWORD); + String callbackPrefix = ListenerName.forSecurityProtocol(securityProtocol).saslMechanismConfigPrefix("PLAIN"); + saslServerConfigs.put(callbackPrefix + BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG, + TestServerCallbackHandler.class.getName()); + server = createEchoServer(securityProtocol); + + String node = "node1"; + try { + createClientConnection(securityProtocol, node); + NetworkTestUtils.waitForChannelReady(selector, node); + server.verifyAuthenticationMetrics(1, 0); + + /* + * Now start the reauthentication on the connection. First, we have to sleep long enough so + * that the next write will cause re-authentication + */ + delay((long) (CONNECTIONS_MAX_REAUTH_MS_VALUE * 1.1)); + server.verifyReauthenticationMetrics(0, 0); + + // block reauthentication to complete + TestServerCallbackHandler.sem.acquire(); + + String prefix = TestUtils.randomString(100); + // send a client request to start a reauthentication. + selector.send(new NetworkSend(node, ByteBufferSend.sizePrefixed(ByteBuffer.wrap((prefix + "-0").getBytes(StandardCharsets.UTF_8))))); + // wait till reauthentication is blocked + TestUtils.waitForCondition(() -> { + selector.poll(10L); + return TestServerCallbackHandler.sem.hasQueuedThreads(); + }, 5000, "Reauthentication is not blocked"); + + // Set the client's channel `send` to null to allow setting a new send on the server's selector. + // Without this, NioEchoServer will throw an error while processing the client request, + // since we're manually setting a server side send to simulate the issue. + TestUtils.setFieldValue(selector.channel(node), "send", null); + + // extract the channel id from the server's selector and directly set a send on it. + String channelId = server.selector().channels().get(0).id(); + String payload = prefix + "-1"; + server.selector().send(new NetworkSend(channelId, ByteBufferSend.sizePrefixed(ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8))))); + // allow reauthentication to complete + TestServerCallbackHandler.sem.release(); + + TestUtils.waitForCondition(() -> { + selector.poll(10L); + for (NetworkReceive receive : selector.completedReceives()) { + assertEquals(payload, new String(Utils.toArray(receive.payload()), StandardCharsets.UTF_8)); + return true; + } + return false; + }, 5000, "Failed Receive the server send after reauthentication"); + + server.verifyReauthenticationMetrics(1, 0); + } finally { + closeClientConnectionIfNecessary(); + } + } + private void verifySslClientAuthForSaslSslListener(boolean useListenerPrefix, SslClientAuth configuredClientAuth) throws Exception { @@ -2311,6 +2376,7 @@ public class SaslAuthenticatorTest { static final String USERNAME = "TestServerCallbackHandler-user"; static final String PASSWORD = "TestServerCallbackHandler-password"; private volatile boolean configured; + public static Semaphore sem = new Semaphore(1); @Override public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) { @@ -2324,7 +2390,14 @@ public class SaslAuthenticatorTest { protected boolean authenticate(String username, char[] password) { if (!configured) throw new IllegalStateException("Server callback handler not configured"); - return USERNAME.equals(username) && new String(password).equals(PASSWORD); + try { + sem.acquire(); + return USERNAME.equals(username) && new String(password).equals(PASSWORD); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + sem.release(); + } } }