This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e7859bbabe8 KAFKA-18274 Failed to restart controller in testing due to 
closed socket channel [2/2] (#18337)
e7859bbabe8 is described below

commit e7859bbabe868fee8ef57f831397f67d8f845c80
Author: Peter Lee <peterx...@gmail.com>
AuthorDate: Tue Jan 14 03:12:42 2025 +0800

    KAFKA-18274 Failed to restart controller in testing due to closed socket 
channel [2/2] (#18337)
    
    Reviewers: TengYao Chi <kiting...@gmail.com>, Chia-Ping Tsai 
<chia7...@gmail.com>
---
 .../kafka/common/test/PreboundSocketFactoryManager.java  | 15 +++++++++++++++
 .../kafka/common/test/api/ClusterTestExtensionsTest.java | 16 ++++++++++++++++
 2 files changed, 31 insertions(+)

diff --git 
a/test-common/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java
 
b/test-common/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java
index 5358b211c77..fa082001d64 100644
--- 
a/test-common/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java
+++ 
b/test-common/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java
@@ -49,7 +49,22 @@ public class PreboundSocketFactoryManager implements 
AutoCloseable {
             ServerSocketChannel socketChannel = 
getSocketForListenerAndMarkAsUsed(
                 nodeId,
                 listenerName);
+
             if (socketChannel != null) {
+                if (socketChannel.isOpen()) {
+                    return socketChannel;
+                }
+                // When restarting components(e.g. controllers, brokers) in 
tests, we want to reuse the same
+                // port that was previously allocated to maintain consistent 
addressing
+                // so the client can reconnect to the same port.
+                // Since those components would close the socket when they are 
restarted,
+                // we need to rebind the socket to the same port.
+                socketAddress = new 
InetSocketAddress(socketAddress.getHostString(), 
socketChannel.socket().getLocalPort());
+                socketChannel = ServerSocketFactory.INSTANCE.openServerSocket(
+                        listenerName,
+                        socketAddress,
+                        listenBacklogSize,
+                        recvBufferSize);
                 return socketChannel;
             }
             return ServerSocketFactory.INSTANCE.openServerSocket(
diff --git 
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
 
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
index c680ff004ae..60203649274 100644
--- 
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
+++ 
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.common.test.api;
 
+import kafka.server.ControllerServer;
+
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.Config;
@@ -345,6 +347,20 @@ public class ClusterTestExtensionsTest {
         }
     }
 
+    @ClusterTest(types = {Type.KRAFT})
+    public void testControllerRestart(ClusterInstance cluster) throws 
ExecutionException, InterruptedException {
+        try (Admin admin = cluster.admin()) {
+
+            ControllerServer controller = 
cluster.controllers().values().iterator().next();
+            controller.shutdown();
+            controller.awaitShutdown();
+
+            controller.startup();
+
+            assertEquals(1, 
admin.describeMetadataQuorum().quorumInfo().get().nodes().size());
+        }
+    }
+
     @ClusterTest(
         types = {Type.KRAFT, Type.CO_KRAFT},
         brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,

Reply via email to