This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f73e3bcd6a5 KAFKA-19561: Set OP_WRITE interest after SASL 
reauthentication to resume pending writes (#20258)
f73e3bcd6a5 is described below

commit f73e3bcd6a5b57dc6bcfdfbcbe1b2a5b015e62b2
Author: Manikumar Reddy <manikumar.re...@gmail.com>
AuthorDate: Thu Jul 31 21:59:21 2025 +0530

    KAFKA-19561: Set OP_WRITE interest after SASL reauthentication to resume 
pending writes (#20258)
    
    https://issues.apache.org/jira/browse/KAFKA-19561
    
    Addresses a race condition during SASL reauthentication where the
    server-side `KafkaChannel.send()` queues a response, but OP_WRITE is
    removed before the channel becomes writable — resulting in stuck
    responses and client  timeouts.
    
    Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>
---
 .../apache/kafka/common/network/KafkaChannel.java  | 10 +++
 .../org/apache/kafka/common/network/Selector.java  |  1 +
 .../authenticator/SaslAuthenticatorTest.java       | 75 +++++++++++++++++++++-
 3 files changed, 85 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index df6ccd67ce5..22c24f8408c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -681,4 +681,14 @@ public class KafkaChannel implements AutoCloseable {
     public ChannelMetadataRegistry channelMetadataRegistry() {
         return metadataRegistry;
     }
+
+
+    /**
+     * Maybe add write interest after re-authentication. This is to ensure 
that any pending write operation
+     * is resumed.
+     */
+    public void maybeAddWriteInterestAfterReauth() {
+        if (send != null)
+            this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 151a0fbbd88..7acf88269ee 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -551,6 +551,7 @@ public class Selector implements Selectable, AutoCloseable {
                         boolean isReauthentication = 
channel.successfulAuthentications() > 1;
                         if (isReauthentication) {
                             sensors.successfulReauthentication.record(1.0, 
readyTimeMs);
+                            channel.maybeAddWriteInterestAfterReauth();
                             if (channel.reauthenticationLatencyMs() == null)
                                 log.warn(
                                     "Should never happen: re-authentication 
latency for a re-authenticated channel was null; continuing...");
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index a15414a73d0..0a63c05eb35 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -44,6 +44,7 @@ import 
org.apache.kafka.common.network.ChannelMetadataRegistry;
 import org.apache.kafka.common.network.ChannelState;
 import org.apache.kafka.common.network.ConnectionMode;
 import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.NetworkTestUtils;
 import org.apache.kafka.common.network.NioEchoServer;
@@ -120,6 +121,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -1856,6 +1858,69 @@ public class SaslAuthenticatorTest {
         verifySslClientAuthForSaslSslListener(false, SslClientAuth.REQUIRED);
     }
 
+    @Test
+    public void testServerSidePendingSendDuringReauthentication() throws 
Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        TestJaasConfig jaasConfig = configureMechanisms("PLAIN", 
Collections.singletonList("PLAIN"));
+        jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, 
PlainLoginModule.class.getName(), new HashMap<>());
+        jaasConfig.setClientOptions("PLAIN", 
TestServerCallbackHandler.USERNAME, TestServerCallbackHandler.PASSWORD);
+        String callbackPrefix = 
ListenerName.forSecurityProtocol(securityProtocol).saslMechanismConfigPrefix("PLAIN");
+        saslServerConfigs.put(callbackPrefix + 
BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG,
+                TestServerCallbackHandler.class.getName());
+        server = createEchoServer(securityProtocol);
+
+        String node = "node1";
+        try {
+            createClientConnection(securityProtocol, node);
+            NetworkTestUtils.waitForChannelReady(selector, node);
+            server.verifyAuthenticationMetrics(1, 0);
+
+            /*
+             * Now start the reauthentication on the connection. First, we 
have to sleep long enough so
+             * that the next write will cause re-authentication
+             */
+            delay((long) (CONNECTIONS_MAX_REAUTH_MS_VALUE * 1.1));
+            server.verifyReauthenticationMetrics(0, 0);
+
+            // block reauthentication to complete
+            TestServerCallbackHandler.sem.acquire();
+
+            String prefix = TestUtils.randomString(100);
+            // send a client request to start a reauthentication.
+            selector.send(new NetworkSend(node, 
ByteBufferSend.sizePrefixed(ByteBuffer.wrap((prefix + 
"-0").getBytes(StandardCharsets.UTF_8)))));
+            // wait till reauthentication is blocked
+            TestUtils.waitForCondition(() -> {
+                selector.poll(10L);
+                return TestServerCallbackHandler.sem.hasQueuedThreads();
+            }, 5000, "Reauthentication is not blocked");
+
+            // Set the client's channel `send` to null to allow setting a new 
send on the server's selector.
+            // Without this, NioEchoServer will throw an error while 
processing the client request,
+            // since we're manually setting a server side send to simulate the 
issue.
+            TestUtils.setFieldValue(selector.channel(node), "send", null);
+
+            // extract the channel id from the server's selector and directly 
set a send on it.
+            String channelId = server.selector().channels().get(0).id();
+            String payload = prefix + "-1";
+            server.selector().send(new NetworkSend(channelId, 
ByteBufferSend.sizePrefixed(ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8)))));
+            // allow reauthentication to complete
+            TestServerCallbackHandler.sem.release();
+
+            TestUtils.waitForCondition(() -> {
+                selector.poll(10L);
+                for (NetworkReceive receive : selector.completedReceives()) {
+                    assertEquals(payload, new 
String(Utils.toArray(receive.payload()), StandardCharsets.UTF_8));
+                    return true;
+                }
+                return false;
+            }, 5000, "Failed Receive the server send after reauthentication");
+
+            server.verifyReauthenticationMetrics(1, 0);
+        } finally {
+            closeClientConnectionIfNecessary();
+        }
+    }
+
     private void verifySslClientAuthForSaslSslListener(boolean 
useListenerPrefix,
                                                        SslClientAuth 
configuredClientAuth) throws Exception {
 
@@ -2311,6 +2376,7 @@ public class SaslAuthenticatorTest {
         static final String USERNAME = "TestServerCallbackHandler-user";
         static final String PASSWORD = "TestServerCallbackHandler-password";
         private volatile boolean configured;
+        public static Semaphore sem = new Semaphore(1);
 
         @Override
         public void configure(Map<String, ?> configs, String mechanism, 
List<AppConfigurationEntry> jaasConfigEntries) {
@@ -2324,7 +2390,14 @@ public class SaslAuthenticatorTest {
         protected boolean authenticate(String username, char[] password) {
             if (!configured)
                 throw new IllegalStateException("Server callback handler not 
configured");
-            return USERNAME.equals(username) && new 
String(password).equals(PASSWORD);
+            try {
+                sem.acquire();
+                return USERNAME.equals(username) && new 
String(password).equals(PASSWORD);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } finally {
+                sem.release();
+            }
         }
     }
 

Reply via email to